diff options
69 files changed, 1028 insertions, 496 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index 19b7af606b..317846d2a2 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,74 @@ +Changes in synapse v0.28.1 (2018-05-01) +======================================= + +SECURITY UPDATE + +* Clamp the allowed values of event depth received over federation to be + [0, 2^63 - 1]. This mitigates an attack where malicious events + injected with depth = 2^63 - 1 render rooms unusable. Depth is used to + determine the cosmetic ordering of events within a room, and so the ordering + of events in such a room will default to using stream_ordering rather than depth + (topological_ordering). + + This is a temporary solution to mitigate abuse in the wild, whilst a long term solution + is being implemented to improve how the depth parameter is used. + + Full details at + https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI + +* Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API. + + +Changes in synapse v0.28.0 (2018-04-26) +======================================= + +Bug Fixes: + +* Fix quarantine media admin API and search reindex (PR #3130) +* Fix media admin APIs (PR #3134) + + +Changes in synapse v0.28.0-rc1 (2018-04-24) +=========================================== + +Minor performance improvement to federation sending and bug fixes. + +(Note: This release does not include the delta state resolution implementation discussed in matrix live) + + +Features: + +* Add metrics for event processing lag (PR #3090) +* Add metrics for ResponseCache (PR #3092) + +Changes: + +* Synapse on PyPy (PR #2760) Thanks to @Valodim! +* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel! +* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh! +* Document the behaviour of ResponseCache (PR #3059) +* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile! +* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel! +* use python3-compatible prints (PR #3074) Thanks to @NotAFile! +* Send federation events concurrently (PR #3078) +* Limit concurrent event sends for a room (PR #3079) +* Improve R30 stat definition (PR #3086) +* Send events to ASes concurrently (PR #3088) +* Refactor ResponseCache usage (PR #3093) +* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh! +* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile! +* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile! +* Refactor store.have_events (PR #3117) + +Bug Fixes: + +* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug! +* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080) +* fix federation_domain_whitelist (PR #3099) +* Avoid creating events with huge numbers of prev_events (PR #3113) +* Reject events which have lots of prev_events (PR #3118) + + Changes in synapse v0.27.4 (2018-04-13) ====================================== @@ -22,10 +93,10 @@ the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored. Changes in synapse v0.27.3-rc1 (2018-04-09) ======================================= -Notable changes include API support for joinability of groups. Also new metrics +Notable changes include API support for joinability of groups. Also new metrics and phone home stats. Phone home stats include better visibility of system usage so we can tweak synpase to work better for all users rather than our own experience -with matrix.org. Also, recording 'r30' stat which is the measure we use to track +with matrix.org. Also, recording 'r30' stat which is the measure we use to track overal growth of the Matrix ecosystem. It is defined as:- Counts the number of native 30 day retained users, defined as:- @@ -61,7 +132,6 @@ Bug fixes: * Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte! * Fix replication after switch to simplejson (PR #3015) -* Fix replication after switch to simplejson (PR #3015) * 404 correctly on missing paths via NoResource (PR #3022) * Fix error when claiming e2e keys from offline servers (PR #3034) * fix tests/storage/test_user_directory.py (PR #3042) diff --git a/README.rst b/README.rst index 8812cc1b4f..28fbe45de6 100644 --- a/README.rst +++ b/README.rst @@ -614,6 +614,9 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port> $ dig -t srv _matrix._tcp.example.com _matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com. +Note that the server hostname cannot be an alias (CNAME record): it has to point +directly to the server hosting the synapse instance. + You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in its user-ids, by setting ``server_name``:: diff --git a/contrib/README.rst b/contrib/README.rst new file mode 100644 index 0000000000..c296c55628 --- /dev/null +++ b/contrib/README.rst @@ -0,0 +1,10 @@ +Community Contributions +======================= + +Everything in this directory are projects submitted by the community that may be useful +to others. As such, the project maintainers cannot guarantee support, stability +or backwards compatibility of these projects. + +Files in this directory should *not* be relied on directly, as they may not +continue to work or exist in future. If you wish to use any of these files then +they should be copied to avoid them breaking from underneath you. diff --git a/contrib/graph/graph3.py b/contrib/graph/graph3.py index 88d92c89d7..7d3b4d7eb6 100644 --- a/contrib/graph/graph3.py +++ b/contrib/graph/graph3.py @@ -22,6 +22,8 @@ import argparse from synapse.events import FrozenEvent from synapse.util.frozenutils import unfreeze +from six import string_types + def make_graph(file_name, room_id, file_prefix, limit): print "Reading lines" @@ -58,7 +60,7 @@ def make_graph(file_name, room_id, file_prefix, limit): for key, value in unfreeze(event.get_dict()["content"]).items(): if value is None: value = "<null>" - elif isinstance(value, basestring): + elif isinstance(value, string_types): pass else: value = json.dumps(value) diff --git a/contrib/prometheus/consoles/synapse.html b/contrib/prometheus/consoles/synapse.html index e23d8a1fce..69aa87f85e 100644 --- a/contrib/prometheus/consoles/synapse.html +++ b/contrib/prometheus/consoles/synapse.html @@ -202,11 +202,11 @@ new PromConsole.Graph({ <h1>Requests</h1> <h3>Requests by Servlet</h3> -<div id="synapse_http_server_requests_servlet"></div> +<div id="synapse_http_server_request_count_servlet"></div> <script> new PromConsole.Graph({ - node: document.querySelector("#synapse_http_server_requests_servlet"), - expr: "rate(synapse_http_server_requests:servlet[2m])", + node: document.querySelector("#synapse_http_server_request_count_servlet"), + expr: "rate(synapse_http_server_request_count:servlet[2m])", name: "[[servlet]]", yAxisFormatter: PromConsole.NumberFormatter.humanize, yHoverFormatter: PromConsole.NumberFormatter.humanize, @@ -215,11 +215,11 @@ new PromConsole.Graph({ }) </script> <h4> (without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4> -<div id="synapse_http_server_requests_servlet_minus_events"></div> +<div id="synapse_http_server_request_count_servlet_minus_events"></div> <script> new PromConsole.Graph({ - node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"), - expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])", + node: document.querySelector("#synapse_http_server_request_count_servlet_minus_events"), + expr: "rate(synapse_http_server_request_count:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])", name: "[[servlet]]", yAxisFormatter: PromConsole.NumberFormatter.humanize, yHoverFormatter: PromConsole.NumberFormatter.humanize, @@ -233,7 +233,7 @@ new PromConsole.Graph({ <script> new PromConsole.Graph({ node: document.querySelector("#synapse_http_server_response_time_avg"), - expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000", + expr: "rate(synapse_http_server_response_time_seconds[2m]) / rate(synapse_http_server_response_count[2m]) / 1000", name: "[[servlet]]", yAxisFormatter: PromConsole.NumberFormatter.humanize, yHoverFormatter: PromConsole.NumberFormatter.humanize, @@ -276,7 +276,7 @@ new PromConsole.Graph({ <script> new PromConsole.Graph({ node: document.querySelector("#synapse_http_server_response_ru_utime"), - expr: "rate(synapse_http_server_response_ru_utime:total[2m])", + expr: "rate(synapse_http_server_response_ru_utime_seconds[2m])", name: "[[servlet]]", yAxisFormatter: PromConsole.NumberFormatter.humanize, yHoverFormatter: PromConsole.NumberFormatter.humanize, @@ -291,7 +291,7 @@ new PromConsole.Graph({ <script> new PromConsole.Graph({ node: document.querySelector("#synapse_http_server_response_db_txn_duration"), - expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])", + expr: "rate(synapse_http_server_response_db_txn_duration_seconds[2m])", name: "[[servlet]]", yAxisFormatter: PromConsole.NumberFormatter.humanize, yHoverFormatter: PromConsole.NumberFormatter.humanize, @@ -306,7 +306,7 @@ new PromConsole.Graph({ <script> new PromConsole.Graph({ node: document.querySelector("#synapse_http_server_send_time_avg"), - expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000", + expr: "rate(synapse_http_server_response_time_second{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_count{servlet='RoomSendEventRestServlet'}[2m]) / 1000", name: "[[servlet]]", yAxisFormatter: PromConsole.NumberFormatter.humanize, yHoverFormatter: PromConsole.NumberFormatter.humanize, diff --git a/contrib/prometheus/synapse-v1.rules b/contrib/prometheus/synapse-v1.rules index b6f84174b0..4c900ba537 100644 --- a/contrib/prometheus/synapse-v1.rules +++ b/contrib/prometheus/synapse-v1.rules @@ -1,10 +1,10 @@ synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0) synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0) -synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method) -synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet) +synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method) +synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet) -synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet) +synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet) synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m]) synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s]) diff --git a/contrib/prometheus/synapse-v2.rules b/contrib/prometheus/synapse-v2.rules index 07e37a885e..6ccca2daaf 100644 --- a/contrib/prometheus/synapse-v2.rules +++ b/contrib/prometheus/synapse-v2.rules @@ -5,19 +5,19 @@ groups: expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)" - record: "synapse_federation_transaction_queue_pendingPdus:total" expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)" - - record: 'synapse_http_server_requests:method' + - record: 'synapse_http_server_request_count:method' labels: servlet: "" - expr: "sum(synapse_http_server_requests) by (method)" - - record: 'synapse_http_server_requests:servlet' + expr: "sum(synapse_http_server_request_count) by (method)" + - record: 'synapse_http_server_request_count:servlet' labels: method: "" - expr: 'sum(synapse_http_server_requests) by (servlet)' + expr: 'sum(synapse_http_server_request_count) by (servlet)' - - record: 'synapse_http_server_requests:total' + - record: 'synapse_http_server_request_count:total' labels: servlet: "" - expr: 'sum(synapse_http_server_requests:by_method) by (servlet)' + expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)' - record: 'synapse_cache:hit_ratio_5m' expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])' diff --git a/contrib/systemd/synapse.service b/contrib/systemd/synapse.service index 3f037055b9..b81ce3915d 100644 --- a/contrib/systemd/synapse.service +++ b/contrib/systemd/synapse.service @@ -2,6 +2,9 @@ # (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux) # rather than in a user home directory or similar under virtualenv. +# **NOTE:** This is an example service file that may change in the future. If you +# wish to use this please copy rather than symlink it. + [Unit] Description=Synapse Matrix homeserver @@ -12,6 +15,7 @@ Group=synapse WorkingDirectory=/var/lib/synapse ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml +# EnvironmentFile=-/etc/sysconfig/synapse # Can be used to e.g. set SYNAPSE_CACHE_FACTOR [Install] WantedBy=multi-user.target diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 7b23a44854..b9b828c154 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -30,6 +30,8 @@ import time import traceback import yaml +from six import string_types + logger = logging.getLogger("synapse_port_db") @@ -574,7 +576,7 @@ class Porter(object): def conv(j, col): if j in bool_cols: return bool(col) - elif isinstance(col, basestring) and "\0" in col: + elif isinstance(col, string_types) and "\0" in col: logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col) raise BadValueException(); return col diff --git a/synapse/__init__.py b/synapse/__init__.py index 0c1c16b9a4..f31cb9a3cb 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.27.4" +__version__ = "0.28.1" diff --git a/synapse/api/auth.py b/synapse/api/auth.py index ac0a3655a5..f17fda6315 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -204,8 +204,8 @@ class Auth(object): ip_addr = self.hs.get_ip_from_request(request) user_agent = request.requestHeaders.getRawHeaders( - "User-Agent", - default=[""] + b"User-Agent", + default=[b""] )[0] if user and access_token and ip_addr: self.store.insert_client_ip( @@ -672,7 +672,7 @@ def has_access_token(request): bool: False if no access_token was given, True otherwise. """ query_params = request.args.get("access_token") - auth_headers = request.requestHeaders.getRawHeaders("Authorization") + auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") return bool(query_params) or bool(auth_headers) @@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401): AuthError: If there isn't an access_token in the request. """ - auth_headers = request.requestHeaders.getRawHeaders("Authorization") - query_params = request.args.get("access_token") + auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") + query_params = request.args.get(b"access_token") if auth_headers: # Try the get the access_token from a "Authorization: Bearer" # header diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 489efb7f86..5baba43966 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -16,6 +16,9 @@ """Contains constants from the specification.""" +# the "depth" field on events is limited to 2**63 - 1 +MAX_DEPTH = 2**63 - 1 + class Membership(object): diff --git a/synapse/api/errors.py b/synapse/api/errors.py index bee59e80dd..a9ff5576f3 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -18,6 +18,7 @@ import logging import simplejson as json +from six import iteritems logger = logging.getLogger(__name__) @@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs): A dict representing the error response JSON. """ err = {"error": msg, "errcode": code} - for key, value in kwargs.iteritems(): + for key, value in iteritems(kwargs): err[key] = value return err diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index de889357c3..b349e3e3ce 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet): # They're actually trying to upload something, proxy to main synapse. # Pass through the auth headers, if any, in case the access token # is there. - auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) headers = { "Authorization": auth_headers, } diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 508b66613d..2fddcd935a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor from twisted.web.resource import NoResource +from six import iteritems + logger = logging.getLogger("synapse.app.synchrotron") @@ -211,7 +213,7 @@ class SynchrotronPresence(object): def get_currently_syncing_users(self): return [ - user_id for user_id, count in self.user_to_num_current_syncs.iteritems() + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) if count > 0 ] diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index d5a7a5ce2f..5fdb579723 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -21,6 +21,8 @@ from twisted.internet import defer import logging import re +from six import string_types + logger = logging.getLogger(__name__) @@ -146,7 +148,7 @@ class ApplicationService(object): ) regex = regex_obj.get("regex") - if isinstance(regex, basestring): + if isinstance(regex, string_types): regex_obj["regex"] = re.compile(regex) # Pre-compile regex else: raise ValueError( diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 40c433d7ae..00efff1464 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind from synapse.api.errors import CodeMessageException from synapse.http.client import SimpleHttpClient from synapse.events.utils import serialize_event -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.caches.response_cache import ResponseCache from synapse.types import ThirdPartyInstanceID @@ -73,7 +72,8 @@ class ApplicationServiceApi(SimpleHttpClient): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() - self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS) + self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta", + timeout_ms=HOUR_IN_MS) @defer.inlineCallbacks def query_user(self, service, user_id): @@ -193,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(None) key = (service.id, protocol) - result = self.protocol_meta_cache.get(key) - if not result: - result = self.protocol_meta_cache.set( - key, preserve_fn(_get)() - ) - return make_deferred_yieldable(result) + return self.protocol_meta_cache.wrap(key, _get) @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): diff --git a/synapse/config/_base.py b/synapse/config/_base.py index fa105bce72..32b439d20a 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -19,6 +19,8 @@ import os import yaml from textwrap import dedent +from six import integer_types + class ConfigError(Exception): pass @@ -49,7 +51,7 @@ Missing mandatory `server_name` config option. class Config(object): @staticmethod def parse_size(value): - if isinstance(value, int) or isinstance(value, long): + if isinstance(value, integer_types): return value sizes = {"K": 1024, "M": 1024 * 1024} size = 1 @@ -61,7 +63,7 @@ class Config(object): @staticmethod def parse_duration(value): - if isinstance(value, int) or isinstance(value, long): + if isinstance(value, integer_types): return value second = 1000 minute = 60 * second @@ -288,22 +290,22 @@ class Config(object): ) obj.invoke_all("generate_files", config) config_file.write(config_bytes) - print ( + print(( "A config file has been generated in %r for server name" " %r with corresponding SSL keys and self-signed" " certificates. Please review this file and customise it" " to your needs." - ) % (config_path, server_name) - print ( + ) % (config_path, server_name)) + print( "If this server name is incorrect, you will need to" " regenerate the SSL certificates" ) return else: - print ( + print(( "Config file %r already exists. Generating any missing key" " files." - ) % (config_path,) + ) % (config_path,)) generate_keys = True parser = argparse.ArgumentParser( diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index aba0aec6e8..9a2359b6fd 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -21,6 +21,8 @@ import urllib import yaml import logging +from six import string_types + logger = logging.getLogger(__name__) @@ -89,14 +91,14 @@ def _load_appservice(hostname, as_info, config_filename): "id", "as_token", "hs_token", "sender_localpart" ] for field in required_string_fields: - if not isinstance(as_info.get(field), basestring): + if not isinstance(as_info.get(field), string_types): raise KeyError("Required string field: '%s' (%s)" % ( field, config_filename, )) # 'url' must either be a string or explicitly null, not missing # to avoid accidentally turning off push for ASes. - if (not isinstance(as_info.get("url"), basestring) and + if (not isinstance(as_info.get("url"), string_types) and as_info.get("url", "") is not None): raise KeyError( "Required string field or explicit null: 'url' (%s)" % (config_filename,) @@ -128,7 +130,7 @@ def _load_appservice(hostname, as_info, config_filename): "Expected namespace entry in %s to be an object," " but got %s", ns, regex_obj ) - if not isinstance(regex_obj.get("regex"), basestring): + if not isinstance(regex_obj.get("regex"), string_types): raise ValueError( "Missing/bad type 'regex' key in %s", regex_obj ) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 35f810b07b..fce83d445f 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -352,7 +352,7 @@ class Keyring(object): logger.exception( "Unable to get key from %r: %s %s", perspective_name, - type(e).__name__, str(e.message), + type(e).__name__, str(e), ) defer.returnValue({}) @@ -384,7 +384,7 @@ class Keyring(object): logger.info( "Unable to get key %r for %r directly: %s %s", key_ids, server_name, - type(e).__name__, str(e.message), + type(e).__name__, str(e), ) if not keys: @@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request): except IOError as e: logger.warn( "Got IOError when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), + server_name, type(e).__name__, str(e), ) raise SynapseError( 502, @@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request): except Exception as e: logger.exception( "Got Exception when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), + server_name, type(e).__name__, str(e), ) raise SynapseError( 401, diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 79eaa31031..4cc98a3fe8 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -14,7 +14,10 @@ # limitations under the License. import logging -from synapse.api.errors import SynapseError +import six + +from synapse.api.constants import MAX_DEPTH +from synapse.api.errors import SynapseError, Codes from synapse.crypto.event_signing import check_event_content_hash from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False): FrozenEvent Raises: - SynapseError: if the pdu is missing required fields + SynapseError: if the pdu is missing required fields or is otherwise + not a valid matrix event """ # we could probably enforce a bunch of other fields here (room_id, sender, # origin, etc etc) - assert_params_in_request(pdu_json, ('event_id', 'type')) + assert_params_in_request(pdu_json, ('event_id', 'type', 'depth')) + + depth = pdu_json['depth'] + if not isinstance(depth, six.integer_types): + raise SynapseError(400, "Depth %r not an intger" % (depth, ), + Codes.BAD_JSON) + + if depth < 0: + raise SynapseError(400, "Depth too small", Codes.BAD_JSON) + elif depth > MAX_DEPTH: + raise SynapseError(400, "Depth too large", Codes.BAD_JSON) + event = FrozenEvent( pdu_json ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 38440da5b5..8e2c0c4cd2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -394,7 +394,7 @@ class FederationClient(FederationBase): seen_events = yield self.store.get_events(event_ids, allow_rejected=True) signed_events = seen_events.values() else: - seen_events = yield self.store.have_events(event_ids) + seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] failed_to_fetch = set() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bea7fd0b71..247ddc89d5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,9 +31,10 @@ import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logutils import log_function +from six import iteritems + # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. TRANSACTION_CONCURRENCY_LIMIT = 10 @@ -65,7 +67,7 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) @defer.inlineCallbacks @log_function @@ -212,16 +214,17 @@ class FederationServer(FederationBase): if not in_room: raise AuthError(403, "Host not in room.") - result = self._state_resp_cache.get((room_id, event_id)) - if not result: - with (yield self._server_linearizer.queue((origin, room_id))): - d = self._state_resp_cache.set( - (room_id, event_id), - preserve_fn(self._on_context_state_request_compute)(room_id, event_id) - ) - resp = yield make_deferred_yieldable(d) - else: - resp = yield make_deferred_yieldable(result) + # we grab the linearizer to protect ourselves from servers which hammer + # us. In theory we might already have the response to this query + # in the cache so we could return it without waiting for the linearizer + # - but that's non-trivial to get right, and anyway somewhat defeats + # the point of the linearizer. + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self._state_resp_cache.wrap( + (room_id, event_id), + self._on_context_state_request_compute, + room_id, event_id, + ) defer.returnValue((200, resp)) @@ -425,9 +428,9 @@ class FederationServer(FederationBase): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) @@ -494,13 +497,33 @@ class FederationServer(FederationBase): def _handle_received_pdu(self, origin, pdu): """ Process a PDU received in a federation /send/ transaction. + If the event is invalid, then this method throws a FederationError. + (The error will then be logged and sent back to the sender (which + probably won't do anything with it), and other events in the + transaction will be processed as normal). + + It is likely that we'll then receive other events which refer to + this rejected_event in their prev_events, etc. When that happens, + we'll attempt to fetch the rejected event again, which will presumably + fail, so those second-generation events will also get rejected. + + Eventually, we get to the point where there are more than 10 events + between any new events and the original rejected event. Since we + only try to backfill 10 events deep on received pdu, we then accept the + new event, possibly introducing a discontinuity in the DAG, with new + forward extremities, so normal service is approximately returned, + until we try to backfill across the discontinuity. + Args: origin (str): server which sent the pdu pdu (FrozenEvent): received pdu Returns (Deferred): completes with None - Raises: FederationError if the signatures / hash do not match - """ + + Raises: FederationError if the signatures / hash do not match, or + if the event was unacceptable for any other reason (eg, too large, + too many prev_events, couldn't find the prev_events) + """ # check that it's actually being sent from a valid destination to # workaround bug #1753 in 0.18.5 and 0.18.6 if origin != get_domain_from_id(pdu.event_id): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 93e5acebc1..0f0c687b37 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -40,6 +40,8 @@ from collections import namedtuple import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object): user_ids = set( user_id - for uids in self.presence_changed.itervalues() + for uids in itervalues(self.presence_changed) for user_id in uids ) @@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object): # stream position. keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} - for ((destination, edu_key), pos) in keyed_edus.iteritems(): + for ((destination, edu_key), pos) in iteritems(keyed_edus): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], @@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object): j = keys.bisect_right(to_token) + 1 device_messages = {self.device_messages[k]: k for k in keys[i:j]} - for (destination, pos) in device_messages.iteritems(): + for (destination, pos) in iteritems(device_messages): rows.append((pos, DeviceRow( destination=destination, ))) @@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows): if buff.presence: transaction_queue.send_presence(buff.presence) - for destination, edu_map in buff.keyed_edus.iteritems(): + for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=key, ) - for destination, edu_list in buff.edus.iteritems(): + for destination, edu_list in iteritems(buff.edus): for edu in edu_list: transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=None, ) - for destination, failure_list in buff.failures.iteritems(): + for destination, failure_list in iteritems(buff.failures): for failure in failure_list: transaction_queue.send_failure(destination, failure) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a141ec9953..963d938edd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -169,7 +169,7 @@ class TransactionQueue(object): while True: last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( - last_token, self._last_poked_id, limit=20, + last_token, self._last_poked_id, limit=100, ) logger.debug("Handling %s -> %s", last_token, next_token) @@ -177,24 +177,33 @@ class TransactionQueue(object): if not events and next_token >= self._last_poked_id: break - for event in events: + @defer.inlineCallbacks + def handle_event(event): # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.event_id) if not is_mine and send_on_behalf_of is None: - continue - - # Get the state from before the event. - # We need to make sure that this is the state from before - # the event and not from after it. - # Otherwise if the last member on a server in a room is - # banned then it won't receive the event because it won't - # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], - ) + return + + try: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. + destinations = yield self.state.get_current_hosts_in_room( + event.room_id, latest_event_ids=[ + prev_id for prev_id, _ in event.prev_events + ], + ) + except Exception: + logger.exception( + "Failed to calculate hosts in room for event: %s", + event.event_id, + ) + return + destinations = set(destinations) if send_on_behalf_of is not None: @@ -207,12 +216,44 @@ class TransactionQueue(object): self._send_pdu(event, destinations) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + events_by_room = {} + for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], + consumeErrors=True + )) yield self.store.update_federation_out_pos( "events", next_token ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4c94d5a36c..ff0656df3e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,12 +94,6 @@ class Authenticator(object): "signatures": {}, } - if ( - self.federation_domain_whitelist is not None and - self.server_name not in self.federation_domain_whitelist - ): - raise FederationDeniedError(self.server_name) - if content is not None: json_request["content"] = content @@ -138,6 +132,12 @@ class Authenticator(object): json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig + if ( + self.federation_domain_whitelist is not None and + origin not in self.federation_domain_whitelist + ): + raise FederationDeniedError(origin) + if not json_request["signatures"]: raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..0245197c02 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -18,7 +18,9 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, run_in_background, +) import logging @@ -84,11 +86,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -108,9 +115,33 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 080aca3d71..ae7e0d6da2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -15,8 +15,14 @@ # limitations under the License. """Contains handlers for federation events.""" + +import httplib +import itertools +import logging + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json +from twisted.internet import defer from unpaddedbase64 import decode_base64 from ._base import BaseHandler @@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.distributor import user_joined_room -from twisted.internet import defer - -import itertools -import logging logger = logging.getLogger(__name__) @@ -115,6 +117,19 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # do some initial sanity-checking of the event. In particular, make + # sure it doesn't have hundreds of prev_events or auth_events, which + # could cause a huge state resolution or cascade of event fetches. + try: + self._sanity_check_event(pdu) + except SynapseError as err: + raise FederationError( + "ERROR", + err.code, + err.msg, + affected=pdu.event_id, + ) + # If we are currently in the process of joining this room, then we # queue up events for later processing. if pdu.room_id in self.room_queues: @@ -149,10 +164,6 @@ class FederationHandler(BaseHandler): auth_chain = [] - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - fetch_state = False # Get missing pdus if necessary. @@ -168,7 +179,7 @@ class FederationHandler(BaseHandler): ) prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this @@ -196,8 +207,7 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: logger.info( @@ -248,8 +258,7 @@ class FederationHandler(BaseHandler): min_depth (int): Minimum depth of events to return. """ # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return @@ -361,9 +370,7 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + seen_ids = yield self.store.have_seen_events(event_ids) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication @@ -527,9 +534,16 @@ class FederationHandler(BaseHandler): def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` - This will attempt to get more events from the remote. This may return - be successfull and still return no events if the other side has no new - events to offer. + This will attempt to get more events from the remote. If the other side + has no new events to offer, this will return an empty list. + + As the events are received, we check their signatures, and also do some + sanity-checking on them. If any of the backfilled events are invalid, + this method throws a SynapseError. + + TODO: make this more useful to distinguish failures of the remote + server from invalid events (there is probably no point in trying to + re-fetch invalid events from every other HS in the room.) """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -541,6 +555,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # ideally we'd sanity check the events here for excess prev_events etc, + # but it's hard to reject events at this point without completely + # breaking backfill in the same way that it is currently broken by + # events whose signature we cannot verify (#3121). + # + # So for now we accept the events anyway. #3124 tracks this. + # + # for ev in events: + # self._sanity_check_event(ev) + # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( set(e.event_id for e in events) @@ -633,7 +657,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_events( + seen_events = yield self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -843,6 +867,38 @@ class FederationHandler(BaseHandler): defer.returnValue(False) + def _sanity_check_event(self, ev): + """ + Do some early sanity checks of a received event + + In particular, checks it doesn't have an excessive number of + prev_events or auth_events, which could cause a huge state resolution + or cascade of event fetches. + + Args: + ev (synapse.events.EventBase): event to be checked + + Returns: None + + Raises: + SynapseError if the event does not pass muster + """ + if len(ev.prev_events) > 20: + logger.warn("Rejecting event %s which has %i prev_events", + ev.event_id, len(ev.prev_events)) + raise SynapseError( + httplib.BAD_REQUEST, + "Too many prev_events", + ) + + if len(ev.auth_events) > 10: + logger.warn("Rejecting event %s which has %i auth_events", + ev.event_id, len(ev.auth_events)) + raise SynapseError( + httplib.BAD_REQUEST, + "Too many auth_events", + ) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler): event_key = None if event_auth_events - current_state: - have_events = yield self.store.have_events( + # TODO: can we use store.have_seen_events here instead? + have_events = yield self.store.get_seen_events_with_rejections( event_auth_events - current_state ) else: @@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): + if e.event_id in seen_remotes: continue if e.event_id == event.event_id: @@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.have_events( + have_events = yield self.store.get_seen_events_with_rejections( [e_id for e_id, _ in event.auth_events] ) seen_events = set(have_events.keys()) @@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler): local_auth_chain, ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] ) # 3. Process any remote auth chain events we haven't seen. for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): + if ev.event_id in seen_remotes: continue if ev.event_id == event.event_id: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6de6e13b7b..53beb2b9ab 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer, reactor from twisted.python.failure import Failure -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, MAX_DEPTH from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event @@ -37,7 +37,6 @@ from ._base import BaseHandler from canonicaljson import encode_canonical_json import logging -import random import simplejson logger = logging.getLogger(__name__) @@ -433,7 +432,7 @@ class EventCreationHandler(object): @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): + prev_events_and_hashes=None): """ Given a dict from a client, create a new event. @@ -447,47 +446,52 @@ class EventCreationHandler(object): event_dict (dict): An entire event token_id (str) txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. Returns: Tuple of created event (FrozenEvent), Context """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_events_and_hashes=prev_events_and_hashes, + ) defer.returnValue((event, context)) @@ -557,64 +561,80 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, basestring): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") @defer.inlineCallbacks - def create_new_client_event(self, builder, requester=None, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) + def create_new_client_event(self, builder, requester=None, + prev_events_and_hashes=None): + """Create a new event for a local client - # We want to limit the max number of prev events we point to in our - # new event - if len(latest_ret) > 10: - # Sort by reverse depth, so we point to the most recent. - latest_ret.sort(key=lambda a: -a[2]) - new_latest_ret = latest_ret[:5] - - # We also randomly point to some of the older events, to make - # sure that we don't completely ignore the older events. - if latest_ret[5:]: - sample_size = min(5, len(latest_ret[5:])) - new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) - latest_ret = new_latest_ret - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 + Args: + builder (EventBuilder): + + requester (synapse.types.Requester|None): + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. + + Returns: + Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] + """ + + if prev_events_and_hashes is not None: + assert len(prev_events_and_hashes) <= 10, \ + "Attempting to create an event with %i prev_events" % ( + len(prev_events_and_hashes), + ) + else: + prev_events_and_hashes = \ + yield self.store.get_prev_events_for_room(builder.room_id) + + if prev_events_and_hashes: + depth = max([d for _, _, d in prev_events_and_hashes]) + 1 + # we cap depth of generated events, to ensure that they are not + # rejected by other servers (and so that they can be persisted in + # the db) + depth = min(depth, MAX_DEPTH) + else: + depth = 1 - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in prev_events_and_hashes + ] builder.prev_events = prev_events builder.depth = depth diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index dd03705279..f83c6b3cf8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,7 +23,7 @@ from synapse.api.errors import ( ) from synapse.http.client import CaptchaServerHttpClient from synapse import types -from synapse.types import UserID +from synapse.types import UserID, create_requester, RoomID, RoomAlias from synapse.util.async import run_on_reactor, Linearizer from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler @@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler): token = None attempts += 1 + # auto-join the user to any rooms we're supposed to dump them into + fake_requester = create_requester(user_id) + for r in self.hs.config.auto_join_rooms: + try: + yield self._join_user_to_room(fake_requester, r) + except Exception as e: + logger.error("Failed to join new user to %r: %r", r, e) + # We used to generate default identicons here, but nowadays # we want clients to generate their own as part of their branding # rather than there being consistent matrix-wide ones, so we don't. - defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue((user_id, access_token)) + + @defer.inlineCallbacks + def _join_user_to_room(self, requester, room_identifier): + room_id = None + room_member_handler = self.hs.get_room_member_handler() + if RoomID.is_valid(room_identifier): + room_id = room_identifier + elif RoomAlias.is_valid(room_identifier): + room_alias = RoomAlias.from_string(room_identifier) + room_id, remote_room_hosts = ( + yield room_member_handler.lookup_room_alias(room_alias) + ) + room_id = room_id.to_string() + else: + raise SynapseError(400, "%s was not legal room ID or room alias" % ( + room_identifier, + )) + + yield room_member_handler.update_membership( + requester=requester, + target=requester.user, + room_id=room_id, + remote_room_hosts=remote_room_hosts, + action="join", + ) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5d81f59b44..add3f9b009 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -20,7 +20,6 @@ from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list") + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, @@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler): ) key = (limit, since_token, network_tuple) - result = self.response_cache.get(key) - if not result: - logger.info("No cached result, calculating one.") - result = self.response_cache.set( - key, - preserve_fn(self._get_public_room_list)( - limit, since_token, network_tuple=network_tuple - ) - ) - else: - logger.info("Using cached deferred result.") - return make_deferred_yieldable(result) + return self.response_cache.wrap( + key, + self._get_public_room_list, + limit, since_token, network_tuple=network_tuple, + ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, @@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler): server_name, limit, since_token, include_all_networks, third_party_instance_id, ) - result = self.remote_response_cache.get(key) - if not result: - result = self.remote_response_cache.set( - key, - repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - ) - return result + return self.remote_response_cache.wrap( + key, + repl_layer.get_public_rooms, + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 9977be8831..714583f1d5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -149,7 +149,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, - prev_event_ids, + prev_events_and_hashes, txn_id=None, ratelimit=True, content=None, @@ -175,7 +175,7 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) # Check if this event matches the previous membership event for the user. @@ -314,7 +314,12 @@ class RoomMemberHandler(object): 403, "Invites have been disabled on this server", ) - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, ) @@ -403,7 +408,7 @@ class RoomMemberHandler(object): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_event_ids=latest_event_ids, + prev_events_and_hashes=prev_events_and_hashes, content=content, ) defer.returnValue(res) @@ -852,6 +857,14 @@ class RoomMemberMasterHandler(RoomMemberHandler): def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join """ + # filter ourselves out of remote_room_hosts: do_invite_join ignores it + # and if it is the only entry we'd like to return a 404 rather than a + # 500. + + remote_room_hosts = [ + host for host in remote_room_hosts if host != self.hs.hostname + ] + if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0f713ce038..b52e4c2aff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -15,7 +15,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute -from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ to tell if room needs to be part of the sync result. """ return bool(self.events) + __bool__ = __nonzero__ # python3 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ @@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) + __bool__ = __nonzero__ # python3 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ @@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ or self.state or self.account_data ) + __bool__ = __nonzero__ # python3 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ def __nonzero__(self): """Invited rooms should always be reported to the client""" return True + __bool__ = __nonzero__ # python3 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ @@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ def __nonzero__(self): return bool(self.join or self.invite or self.leave) + __bool__ = __nonzero__ # python3 class DeviceLists(collections.namedtuple("DeviceLists", [ @@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [ def __nonzero__(self): return bool(self.changed or self.left) + __bool__ = __nonzero__ # python3 class SyncResult(collections.namedtuple("SyncResult", [ @@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.device_lists or self.groups ) + __bool__ = __nonzero__ # python3 class SyncHandler(object): @@ -169,7 +176,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs) + self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, @@ -180,15 +187,11 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - result = self.response_cache.get(sync_config.request_key) - if not result: - result = self.response_cache.set( - sync_config.request_key, - preserve_fn(self._wait_for_sync_for_user)( - sync_config, since_token, timeout, full_state - ) - ) - return make_deferred_yieldable(result) + return self.response_cache.wrap( + sync_config.request_key, + self._wait_for_sync_for_user, + sync_config, since_token, timeout, full_state, + ) @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 87639b9151..00572c2897 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -12,8 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import socket - from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet import defer, reactor from twisted.internet.error import ConnectError @@ -33,7 +31,7 @@ SERVER_CACHE = {} # our record of an individual server which can be tried to reach a destination. # -# "host" is actually a dotted-quad or ipv6 address string. Except when there's +# "host" is the hostname acquired from the SRV record. Except when there's # no SRV record, in which case it is the original hostname. _Server = collections.namedtuple( "_Server", "priority weight host port expires" @@ -297,20 +295,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t payload = answer.payload - hosts = yield _get_hosts_for_srv_record( - dns_client, str(payload.target) - ) - - for (ip, ttl) in hosts: - host_ttl = min(answer.ttl, ttl) - - servers.append(_Server( - host=ip, - port=int(payload.port), - priority=int(payload.priority), - weight=int(payload.weight), - expires=int(clock.time()) + host_ttl, - )) + servers.append(_Server( + host=str(payload.target), + port=int(payload.port), + priority=int(payload.priority), + weight=int(payload.weight), + expires=int(clock.time()) + answer.ttl, + )) servers.sort() cache[service_name] = list(servers) @@ -328,81 +319,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t raise e defer.returnValue(servers) - - -@defer.inlineCallbacks -def _get_hosts_for_srv_record(dns_client, host): - """Look up each of the hosts in a SRV record - - Args: - dns_client (twisted.names.dns.IResolver): - host (basestring): host to look up - - Returns: - Deferred[list[(str, int)]]: a list of (host, ttl) pairs - - """ - ip4_servers = [] - ip6_servers = [] - - def cb(res): - # lookupAddress and lookupIP6Address return a three-tuple - # giving the answer, authority, and additional sections of the - # response. - # - # we only care about the answers. - - return res[0] - - def eb(res, record_type): - if res.check(DNSNameError): - return [] - logger.warn("Error looking up %s for %s: %s", record_type, host, res) - return res - - # no logcontexts here, so we can safely fire these off and gatherResults - d1 = dns_client.lookupAddress(host).addCallbacks( - cb, eb, errbackArgs=("A", )) - d2 = dns_client.lookupIPV6Address(host).addCallbacks( - cb, eb, errbackArgs=("AAAA", )) - results = yield defer.DeferredList( - [d1, d2], consumeErrors=True) - - # if all of the lookups failed, raise an exception rather than blowing out - # the cache with an empty result. - if results and all(s == defer.FAILURE for (s, _) in results): - defer.returnValue(results[0][1]) - - for (success, result) in results: - if success == defer.FAILURE: - continue - - for answer in result: - if not answer.payload: - continue - - try: - if answer.type == dns.A: - ip = answer.payload.dottedQuad() - ip4_servers.append((ip, answer.ttl)) - elif answer.type == dns.AAAA: - ip = socket.inet_ntop( - socket.AF_INET6, answer.payload.address, - ) - ip6_servers.append((ip, answer.ttl)) - else: - # the most likely candidate here is a CNAME record. - # rfc2782 says srvs may not point to aliases. - logger.warn( - "Ignoring unexpected DNS record type %s for %s", - answer.type, host, - ) - continue - except Exception as e: - logger.warn("Ignoring invalid DNS response for %s: %s", - host, e) - continue - - # keep the ipv4 results before the ipv6 results, mostly to match historical - # behaviour. - defer.returnValue(ip4_servers + ip6_servers) diff --git a/synapse/http/server.py b/synapse/http/server.py index 64e083ebfc..8d632290de 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -329,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource): register_paths, so will return (possibly via Deferred) either None, or a tuple of (http code, response body). """ - if request.method == "OPTIONS": + if request.method == b"OPTIONS": return _options_handler, {} # Loop through all the registered callbacks to check if the method @@ -543,7 +543,7 @@ def finish_request(request): def _request_user_agent_is_curl(request): user_agents = request.requestHeaders.getRawHeaders( - "User-Agent", default=[] + b"User-Agent", default=[] ) for user_agent in user_agents: if "curl" in user_agent: diff --git a/synapse/http/site.py b/synapse/http/site.py index e422c8dfae..c8b46e1af2 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -20,7 +20,7 @@ import logging import re import time -ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') +ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') class SynapseRequest(Request): @@ -43,12 +43,12 @@ class SynapseRequest(Request): def get_redacted_uri(self): return ACCESS_TOKEN_RE.sub( - r'\1<redacted>\3', + br'\1<redacted>\3', self.uri ) def get_user_agent(self): - return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] def started_processing(self): self.site.access_logger.info( diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 50d99d7a5c..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -17,12 +17,13 @@ import logging import functools import time import gc +import platform from twisted.internet import reactor from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, + MemoryUsageMetric, GaugeMetric, ) from .process_collector import register_process_collector @@ -30,6 +31,7 @@ from .process_collector import register_process_collector logger = logging.getLogger(__name__) +running_on_pypy = platform.python_implementation() == 'PyPy' all_metrics = [] all_collectors = [] @@ -63,6 +65,13 @@ class Metrics(object): """ return self._register(CounterMetric, *args, **kwargs) + def register_gauge(self, *args, **kwargs): + """ + Returns: + GaugeMetric + """ + return self._register(GaugeMetric, *args, **kwargs) + def register_callback(self, *args, **kwargs): """ Returns: @@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): @@ -174,6 +209,9 @@ def runUntilCurrentTimer(func): tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) + if running_on_pypy: + return ret + # Check if we need to do a manual GC (since its been disabled), and do # one if necessary. threshold = gc.get_threshold() @@ -206,6 +244,7 @@ try: # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, - gc.disable() + if not running_on_pypy: + gc.disable() except AttributeError: pass diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index ff5aa8c0e1..89bd47c3f7 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -115,7 +115,7 @@ class CounterMetric(BaseMetric): # dict[list[str]]: value for each set of label values. the keys are the # label values, in the same order as the labels in self.labels. # - # (if the metric is a scalar, the (single) key is the empty list). + # (if the metric is a scalar, the (single) key is the empty tuple). self.counts = {} # Scalar metrics are never empty @@ -145,6 +145,36 @@ class CounterMetric(BaseMetric): ) +class GaugeMetric(BaseMetric): + """A metric that can go up or down + """ + + def __init__(self, *args, **kwargs): + super(GaugeMetric, self).__init__(*args, **kwargs) + + # dict[list[str]]: value for each set of label values. the keys are the + # label values, in the same order as the labels in self.labels. + # + # (if the metric is a scalar, the (single) key is the empty tuple). + self.guages = {} + + def set(self, v, *values): + if len(values) != self.dimension(): + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) + ) + + # TODO: should assert that the tag values are all strings + + self.guages[values] = v + + def render(self): + return flatten( + self._render_for_labels(k, self.guages[k]) + for k in sorted(self.guages.keys()) + ) + + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever it is rendered. Typically this is used to implement gauges that yield the diff --git a/synapse/notifier.py b/synapse/notifier.py index ef042681bc..0e40a4aad6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -144,6 +144,7 @@ class _NotifierUserStream(object): class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): def __nonzero__(self): return bool(self.events) + __bool__ = __nonzero__ # python3 class Notifier(object): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 5cabf7dabe..711cbb6c50 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -1,5 +1,6 @@ # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +19,18 @@ from distutils.version import LooseVersion logger = logging.getLogger(__name__) +# this dict maps from python package name to a list of modules we expect it to +# provide. +# +# the key is a "requirement specifier", as used as a parameter to `pip +# install`[1], or an `install_requires` argument to `setuptools.setup` [2]. +# +# the value is a sequence of strings; each entry should be the name of the +# python module, optionally followed by a version assertion which can be either +# ">=<ver>" or "==<ver>". +# +# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers. +# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies REQUIREMENTS = { "jsonschema>=2.5.1": ["jsonschema>=2.5.1"], "frozendict>=0.4": ["frozendict"], @@ -26,7 +39,11 @@ REQUIREMENTS = { "signedjson>=1.0.0": ["signedjson>=1.0.0"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], - "Twisted>=16.0.0": ["twisted>=16.0.0"], + + # we break under Twisted 18.4 + # (https://github.com/matrix-org/synapse/issues/3135) + "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"], + "pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], @@ -39,6 +56,7 @@ REQUIREMENTS = { "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], + "six": ["six"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index bbe2f967b7..a9baa2c1c3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.metrics import Measure from synapse.types import Requester, UserID @@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet): self.clock = hs.get_clock() # The responses are tiny, so we may as well cache them for a while - self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000) + self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) def on_PUT(self, request, event_id): - result = self.response_cache.get(event_id) - if not result: - result = self.response_cache.set( - event_id, - self._handle_request(request) - ) - else: - logger.warn("Returning cached response") - return make_deferred_yieldable(result) - - @preserve_fn + return self.response_cache.wrap( + event_id, + self._handle_request, + request + ) + @defer.inlineCallbacks def _handle_request(self, request): with Measure(self.clock, "repl_send_event_parse"): diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py index ca49955935..e092158cb7 100644 --- a/synapse/rest/client/v1/logout.py +++ b/synapse/rest/client/v1/logout.py @@ -44,7 +44,10 @@ class LogoutRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request) except AuthError: # this implies the access token has already been deleted. - pass + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) else: if requester.device_id is None: # the acccess token wasn't associated with a device. diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 5c5fa8f7ab..8a82097178 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet): admin = register_json.get("admin", None) # Its important to check as we use null bytes as HMAC field separators - if "\x00" in user: + if b"\x00" in user: raise SynapseError(400, "Invalid user") - if "\x00" in password: + if b"\x00" in password: raise SynapseError(400, "Invalid password") # str() because otherwise hmac complains that 'unicode' does not diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index d06cbdc35e..2ad0e5943b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet): content=content, ) else: - event, context = yield self.event_creation_hander.create_event( + event = yield self.event_creation_hander.create_and_send_nonmember_event( requester, event_dict, - token_id=requester.access_token_id, txn_id=txn_id, ) - yield self.event_creation_hander.send_nonmember_event( - requester, event, context, - ) - ret = {} if event: ret = {"event_id": event.event_id} diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 0ba62bddc1..f317c919dc 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -20,7 +20,6 @@ import synapse import synapse.types from synapse.api.auth import get_access_token_from_request, has_access_token from synapse.api.constants import LoginType -from synapse.types import RoomID, RoomAlias from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string @@ -405,14 +404,6 @@ class RegisterRestServlet(RestServlet): generate_token=False, ) - # auto-join the user to any rooms we're supposed to dump them into - fake_requester = synapse.types.create_requester(registered_user_id) - for r in self.hs.config.auto_join_rooms: - try: - yield self._join_user_to_room(fake_requester, r) - except Exception as e: - logger.error("Failed to join new user to %r: %r", r, e) - # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) self.auth_handler.set_session_data( @@ -446,29 +437,6 @@ class RegisterRestServlet(RestServlet): return 200, {} @defer.inlineCallbacks - def _join_user_to_room(self, requester, room_identifier): - room_id = None - if RoomID.is_valid(room_identifier): - room_id = room_identifier - elif RoomAlias.is_valid(room_identifier): - room_alias = RoomAlias.from_string(room_identifier) - room_id, remote_room_hosts = ( - yield self.room_member_handler.lookup_room_alias(room_alias) - ) - room_id = room_id.to_string() - else: - raise SynapseError(400, "%s was not legal room ID or room alias" % ( - room_identifier, - )) - - yield self.room_member_handler.update_membership( - requester=requester, - target=requester.user, - room_id=room_id, - action="join", - ) - - @defer.inlineCallbacks def _do_appservice_registration(self, username, as_token, body): user_id = yield self.registration_handler.appservice_register( username, as_token diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 83471b3173..7f263db239 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -16,6 +16,8 @@ from twisted.internet import defer, threads from twisted.protocols.basic import FileSender +import six + from ._base import Responder from synapse.util.file_consumer import BackgroundFileConsumer @@ -119,7 +121,7 @@ class MediaStorage(object): os.remove(fname) except Exception: pass - raise t, v, tb + six.reraise(t, v, tb) if not finished_called: raise Exception("Finished callback not called") diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eacd49d6a5..8cdfd50f90 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore, def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days + * Users who have created their accounts more than 30 days ago * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days + * Where account creation and last_seen are > 30 days apart Returns counts globaly for a given user as well as breaking by platform diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2fbebd4907..2262776ab2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -376,7 +376,7 @@ class SQLBaseStore(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(column[0]) for column in cursor.description) + col_headers = list(intern(str(column[0])) for column in cursor.description) results = list( dict(zip(col_headers, row)) for row in cursor ) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 338b495611..8c868ece75 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,6 +18,7 @@ from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib +import platform SUPPORTED_MODULE = { @@ -31,6 +32,10 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: + # pypy requires psycopg2cffi rather than psycopg2 + if (name == "psycopg2" and + platform.python_implementation() == "PyPy"): + name = "psycopg2cffi" module = importlib.import_module(name) return engine_class(module, database_config) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 00ee82d300..8fbf7ffba7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import random from twisted.internet import defer @@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 import logging -from Queue import PriorityQueue, Empty +from six.moves.queue import PriorityQueue, Empty + +from six.moves import range logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, front_list = list(front) chunks = [ front_list[x:x + 100] - for x in xrange(0, len(front), 100) + for x in range(0, len(front), 100) ] for chunk in chunks: txn.execute( @@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, retcol="event_id", ) + @defer.inlineCallbacks + def get_prev_events_for_room(self, room_id): + """ + Gets a subset of the current forward extremities in the given room. + + Limits the result to 10 extremities, so that we can avoid creating + events which refer to hundreds of prev_events. + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) + if len(res) > 10: + # Sort by reverse depth, so we point to the most recent. + res.sort(key=lambda a: -a[2]) + + # we use half of the limit for the actual most recent events, and + # the other half to randomly point to some of the older events, to + # make sure that we don't completely ignore the older events. + res = res[0:5] + random.sample(res[5:], 5) + + defer.returnValue(res) + def get_latest_event_ids_and_hashes_in_room(self, room_id): + """ + Gets the current forward extremities in the given room + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + return self.runInteraction( "get_latest_event_ids_and_hashes_in_room", self._get_latest_event_ids_and_hashes_in_room, @@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, room_id, ) - @defer.inlineCallbacks - def get_max_depth_of_events(self, event_ids): - sql = ( - "SELECT MAX(depth) FROM events WHERE event_id IN (%s)" - ) % (",".join(["?"] * len(event_ids)),) - - rows = yield self._execute( - "get_max_depth_of_events", None, - sql, *event_ids - ) - - if rows: - defer.returnValue(rows[0][0]) - else: - defer.returnValue(1) - def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41f..5fe4a0e56c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -16,6 +16,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps +import itertools import logging import simplejson as json @@ -444,6 +445,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" @@ -1317,13 +1321,49 @@ class EventsStore(EventsWorkerStore): defer.returnValue(set(r["event_id"] for r in rows)) - def have_events(self, event_ids): + @defer.inlineCallbacks + def have_seen_events(self, event_ids): """Given a list of event ids, check if we have already processed them. + Args: + event_ids (iterable[str]): + + Returns: + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. """ if not event_ids: return defer.succeed({}) @@ -1345,9 +1385,7 @@ class EventsStore(EventsWorkerStore): return res - return self.runInteraction( - "have_events", f, - ) + return self.runInteraction("get_rejection_reasons", f) @defer.inlineCallbacks def count_daily_messages(self): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..a937b9bceb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timestamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 740c036975..ea6a189185 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore): # Convert the IDs to MXC URIs for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) for hostname, media_id in remote_mxcs: remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) @@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore): while next_token: sql = """ SELECT stream_ordering, json FROM events - JOIN event_json USING (event_id) + JOIN event_json USING (room_id, event_id) WHERE room_id = ? AND stream_ordering < ? AND contains_url = ? AND outlier = ? @@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore): if matches: hostname = matches.group(1) media_id = matches.group(2) - if hostname == self.hostname: + if hostname == self.hs.hostname: local_media_mxcs.append(media_id) else: remote_media_mxcs.append((hostname, media_id)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 426cbe6e1a..6ba3e59889 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT stream_ordering, event_id, room_id, type, json, " " origin_server_ts FROM events" - " JOIN event_json USING (event_id)" + " JOIN event_json USING (room_id, event_id)" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" diff --git a/synapse/types.py b/synapse/types.py index 7cb24cecb2..cc7c182a78 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -169,7 +169,7 @@ class DomainSpecificString( except Exception: return False - __str__ = to_string + __repr__ = to_string class UserID(DomainSpecificString): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 00af539880..7f79333e96 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -12,8 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + +from twisted.internet import defer from synapse.util.async import ObservableDeferred +from synapse.util.caches import metrics as cache_metrics +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + +logger = logging.getLogger(__name__) class ResponseCache(object): @@ -24,20 +31,68 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self, hs, timeout_ms=0): + def __init__(self, hs, name, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000. + self._name = name + self._metrics = cache_metrics.register_cache( + "response_cache", + size_callback=lambda: self.size(), + cache_name=name, + ) + + def size(self): + return len(self.pending_result_cache) + def get(self, key): + """Look up the given key. + + Can return either a new Deferred (which also doesn't follow the synapse + logcontext rules), or, if the request has completed, the actual + result. You will probably want to make_deferred_yieldable the result. + + If there is no entry for the key, returns None. It is worth noting that + this means there is no way to distinguish a completed result of None + from an absent cache entry. + + Args: + key (hashable): + + Returns: + twisted.internet.defer.Deferred|None|E: None if there is no entry + for this key; otherwise either a deferred result or the result + itself. + """ result = self.pending_result_cache.get(key) if result is not None: + self._metrics.inc_hits() return result.observe() else: + self._metrics.inc_misses() return None def set(self, key, deferred): + """Set the entry for the given key to the given deferred. + + *deferred* should run its callbacks in the sentinel logcontext (ie, + you should wrap normal synapse deferreds with + logcontext.run_in_background). + + Can return either a new Deferred (which also doesn't follow the synapse + logcontext rules), or, if *deferred* was already complete, the actual + result. You will probably want to make_deferred_yieldable the result. + + Args: + key (hashable): + deferred (twisted.internet.defer.Deferred[T): + + Returns: + twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual + result. + """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result @@ -53,3 +108,52 @@ class ResponseCache(object): result.addBoth(remove) return result.observe() + + def wrap(self, key, callback, *args, **kwargs): + """Wrap together a *get* and *set* call, taking care of logcontexts + + First looks up the key in the cache, and if it is present makes it + follow the synapse logcontext rules and returns it. + + Otherwise, makes a call to *callback(*args, **kwargs)*, which should + follow the synapse logcontext rules, and adds the result to the cache. + + Example usage: + + @defer.inlineCallbacks + def handle_request(request): + # etc + defer.returnValue(result) + + result = yield response_cache.wrap( + key, + handle_request, + request, + ) + + Args: + key (hashable): key to get/set in the cache + + callback (callable): function to call if the key is not found in + the cache + + *args: positional parameters to pass to the callback, if it is used + + **kwargs: named paramters to pass to the callback, if it is used + + Returns: + twisted.internet.defer.Deferred: yieldable result + """ + result = self.get(key) + if not result: + logger.info("[%s]: no cached result for [%s], calculating new one", + self._name, key) + d = run_in_background(callback, *args, **kwargs) + result = self.set(key, d) + elif not isinstance(result, defer.Deferred) or result.called: + logger.info("[%s]: using completed cached result for [%s]", + self._name, key) + else: + logger.info("[%s]: using incomplete cached result for [%s]", + self._name, key) + return make_deferred_yieldable(result) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 90a2608d6f..3c8a165331 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -17,7 +17,7 @@ from twisted.internet import threads, reactor from synapse.util.logcontext import make_deferred_yieldable, preserve_fn -import Queue +from six.moves import queue class BackgroundFileConsumer(object): @@ -49,7 +49,7 @@ class BackgroundFileConsumer(object): # Queue of slices of bytes to be written. When producer calls # unregister a final None is sent. - self._bytes_queue = Queue.Queue() + self._bytes_queue = queue.Queue() # Deferred that is resolved when finished writing self._finished_deferred = None diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d660ec785b..d59adc236e 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -92,6 +92,7 @@ class LoggingContext(object): def __nonzero__(self): return False + __bool__ = __nonzero__ # python3 sentinel = Sentinel() diff --git a/tests/config/test_load.py b/tests/config/test_load.py index 161a87d7e3..772afd2cf9 100644 --- a/tests/config/test_load.py +++ b/tests/config/test_load.py @@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase): def setUp(self): self.dir = tempfile.mkdtemp() - print self.dir + print(self.dir) self.file = os.path.join(self.dir, "homeserver.yaml") def tearDown(self): diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index d4ec02ffc2..149e443022 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase): res_deferreds_2 = kr.verify_json_objects_for_server( [("server10", json1)], ) - yield async.sleep(01) + yield async.sleep(1) self.http_client.post_json.assert_not_called() res_deferreds_2[0].addBoth(self.check_context, None) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a667fb6f0e..b753455943 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) + self.mock_store.get_received_ts.return_value = 0 hs.get_application_service_api = Mock(return_value=self.mock_as_api) hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) hs.get_clock.return_value = MockClock() diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index 2b89c0a3c7..a8d09600bd 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -123,6 +123,7 @@ class EventStreamPermissionsTestCase(RestTestCase): self.ratelimiter.send_message.return_value = (True, 0) hs.config.enable_registration_captcha = False hs.config.enable_registration = True + hs.config.auto_join_rooms = [] hs.get_handlers().federation_handler = Mock() diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index c2e39a7288..00825498b1 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -480,9 +480,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): ApplicationServiceStore(None, hs) e = cm.exception - self.assertIn(f1, e.message) - self.assertIn(f2, e.message) - self.assertIn("id", e.message) + self.assertIn(f1, str(e)) + self.assertIn(f2, str(e)) + self.assertIn("id", str(e)) @defer.inlineCallbacks def test_duplicate_as_tokens(self): @@ -504,6 +504,6 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): ApplicationServiceStore(None, hs) e = cm.exception - self.assertIn(f1, e.message) - self.assertIn(f2, e.message) - self.assertIn("as_token", e.message) + self.assertIn(f1, str(e)) + self.assertIn(f2, str(e)) + self.assertIn("as_token", str(e)) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py new file mode 100644 index 0000000000..30683e7888 --- /dev/null +++ b/tests/storage/test_event_federation.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + + +class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_prev_events_for_room(self): + room_id = '@ROOM:local' + + # add a bunch of events and hashes to act as forward extremities + def insert_event(txn, i): + event_id = '$event_%i:local' % i + + txn.execute(( + "INSERT INTO events (" + " room_id, event_id, type, depth, topological_ordering," + " content, processed, outlier) " + "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)" + ), (room_id, event_id, i, i, True, False)) + + txn.execute(( + 'INSERT INTO event_forward_extremities (room_id, event_id) ' + 'VALUES (?, ?)' + ), (room_id, event_id)) + + txn.execute(( + 'INSERT INTO event_reference_hashes ' + '(event_id, algorithm, hash) ' + "VALUES (?, 'sha256', ?)" + ), (event_id, 'ffff')) + + for i in range(0, 11): + yield self.store.runInteraction("insert", insert_event, i) + + # this should get the last five and five others + r = yield self.store.get_prev_events_for_room(room_id) + self.assertEqual(10, len(r)) + for i in range(0, 5): + el = r[i] + depth = el[2] + self.assertEqual(10 - i, depth) + + for i in range(5, 5): + el = r[i] + depth = el[2] + self.assertLessEqual(5, depth) diff --git a/tests/test_distributor.py b/tests/test_distributor.py index acebcf4a86..010aeaee7e 100644 --- a/tests/test_distributor.py +++ b/tests/test_distributor.py @@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase): def test_signal_catch(self): self.dist.declare("alarm") - observers = [Mock() for i in 1, 2] + observers = [Mock() for i in (1, 2)] for o in observers: self.dist.observe("alarm", o) diff --git a/tests/test_dns.py b/tests/test_dns.py index d08b0f4333..af607d626f 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -33,8 +33,6 @@ class DnsTestCase(unittest.TestCase): service_name = "test_service.example.com" host_name = "example.com" - ip_address = "127.0.0.1" - ip6_address = "::1" answer_srv = dns.RRHeader( type=dns.SRV, @@ -43,29 +41,9 @@ class DnsTestCase(unittest.TestCase): ) ) - answer_a = dns.RRHeader( - type=dns.A, - payload=dns.Record_A( - address=ip_address, - ) - ) - - answer_aaaa = dns.RRHeader( - type=dns.AAAA, - payload=dns.Record_AAAA( - address=ip6_address, - ) - ) - dns_client_mock.lookupService.return_value = defer.succeed( ([answer_srv], None, None), ) - dns_client_mock.lookupAddress.return_value = defer.succeed( - ([answer_a], None, None), - ) - dns_client_mock.lookupIPV6Address.return_value = defer.succeed( - ([answer_aaaa], None, None), - ) cache = {} @@ -74,13 +52,10 @@ class DnsTestCase(unittest.TestCase): ) dns_client_mock.lookupService.assert_called_once_with(service_name) - dns_client_mock.lookupAddress.assert_called_once_with(host_name) - dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name) - self.assertEquals(len(servers), 2) + self.assertEquals(len(servers), 1) self.assertEquals(servers, cache[service_name]) - self.assertEquals(servers[0].host, ip_address) - self.assertEquals(servers[1].host, ip6_address) + self.assertEquals(servers[0].host, host_name) @defer.inlineCallbacks def test_from_cache_expired_and_dns_fail(self): diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py index 76e2234255..d6e1082779 100644 --- a/tests/util/test_file_consumer.py +++ b/tests/util/test_file_consumer.py @@ -20,7 +20,7 @@ from mock import NonCallableMock from synapse.util.file_consumer import BackgroundFileConsumer from tests import unittest -from StringIO import StringIO +from six import StringIO import threading diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 793a88e462..4865eb4bc6 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -18,6 +18,7 @@ from tests import unittest from twisted.internet import defer from synapse.util.async import Linearizer +from six.moves import range class LinearizerTestCase(unittest.TestCase): @@ -58,7 +59,7 @@ class LinearizerTestCase(unittest.TestCase): logcontext.LoggingContext.current_context(), lc) func(0, sleep=True) - for i in xrange(1, 100): + for i in range(1, 100): func(i) return func(1000) diff --git a/tests/utils.py b/tests/utils.py index 8efd3a3475..0cd9f7eeee 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -59,6 +59,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.email_enable_notifs = False config.block_non_admin_invites = False config.federation_domain_whitelist = None + config.federation_rc_reject_limit = 10 + config.federation_rc_sleep_limit = 10 + config.federation_rc_concurrent = 10 + config.filter_timeline_limit = 5000 config.user_directory_search_all_users = False # disable user directory updates, because they get done in the @@ -212,7 +216,7 @@ class MockHttpResource(HttpServer): headers = {} if federation_auth: - headers["Authorization"] = ["X-Matrix origin=test,key=,sig="] + headers[b"Authorization"] = ["X-Matrix origin=test,key=,sig="] mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers) # return the right path if the event requires it |