diff options
author | Erik Johnston <erik@matrix.org> | 2019-03-04 11:54:58 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-03-04 11:54:58 +0000 |
commit | fbc047f2a5f12ee934e5ccbe7274100aa72166b5 (patch) | |
tree | 2eabc4f13032883ff61fc635d0be43292a5ad131 /synapse | |
parent | Update newsfile to have a full stop (diff) | |
parent | Update test_typing to use HomeserverTestCase. (#4771) (diff) | |
download | synapse-fbc047f2a5f12ee934e5ccbe7274100aa72166b5.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/stop_fed_not_in_room
Diffstat (limited to 'synapse')
36 files changed, 571 insertions, 191 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 2004375f98..25c10244d3 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.99.1.1" +__version__ = "0.99.2" diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 043b48f8f3..5070094cad 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -48,6 +48,7 @@ from synapse.rest.client.v1.room import ( RoomMemberListRestServlet, RoomStateRestServlet, ) +from synapse.rest.client.v2_alpha.account import ThreepidRestServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -96,6 +97,7 @@ class ClientReaderServer(HomeServer): RoomEventContextServlet(self).register(resource) RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) + ThreepidRestServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index b116c17669..7da79dc827 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.urls import FEDERATION_PREFIX +from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer): ), }) + if name in ["keys", "federation"]: + resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index d5b954361d..8479fee738 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.errors import SynapseError +from synapse.api.errors import HttpResponseException, SynapseError from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -66,10 +66,15 @@ class PresenceStatusStubServlet(ClientV1RestServlet): headers = { "Authorization": auth_headers, } - result = yield self.http_client.get_json( - self.main_uri + request.uri.decode('ascii'), - headers=headers, - ) + + try: + result = yield self.http_client.get_json( + self.main_uri + request.uri.decode('ascii'), + headers=headers, + ) + except HttpResponseException as e: + raise e.to_synapse_error() + defer.returnValue((200, result)) @defer.inlineCallbacks diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 05a97979ec..e8b6cc3114 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -555,6 +555,9 @@ def run(hs): stats["memory_rss"] += process.memory_info().rss stats["cpu_average"] += int(process.cpu_percent(interval=None)) + stats["database_engine"] = hs.get_datastore().database_engine_name + stats["database_server_version"] = hs.get_datastore().get_server_version() + logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: yield hs.get_simple_http_client().put_json( diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py index 4064891ffb..d25196be08 100644 --- a/synapse/config/captcha.py +++ b/synapse/config/captcha.py @@ -47,5 +47,5 @@ class CaptchaConfig(Config): #captcha_bypass_secret: "YOUR_SECRET_HERE" # The API endpoint to use for verifying m.login.recaptcha responses. - recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify" + recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify" """ diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 8d5d287357..40045de7ac 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -19,6 +19,8 @@ import warnings from datetime import datetime from hashlib import sha256 +import six + from unpaddedbase64 import encode_base64 from OpenSSL import crypto @@ -36,9 +38,11 @@ class TlsConfig(Config): acme_config = {} self.acme_enabled = acme_config.get("enabled", False) - self.acme_url = acme_config.get( + + # hyperlink complains on py2 if this is not a Unicode + self.acme_url = six.text_type(acme_config.get( "url", u"https://acme-v01.api.letsencrypt.org/directory" - ) + )) self.acme_port = acme_config.get("port", 80) self.acme_bind_addresses = acme_config.get("bind_addresses", ['::', '0.0.0.0']) self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30) @@ -55,7 +59,7 @@ class TlsConfig(Config): ) if not self.tls_private_key_file: raise ConfigError( - "tls_certificate_path must be specified if TLS-enabled listeners are " + "tls_private_key_path must be specified if TLS-enabled listeners are " "configured." ) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index cce40fdd2d..7474fd515f 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -17,6 +17,7 @@ import logging from collections import namedtuple +from six import raise_from from six.moves import urllib from signedjson.key import ( @@ -35,7 +36,12 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer -from synapse.api.errors import Codes, RequestSendFailed, SynapseError +from synapse.api.errors import ( + Codes, + HttpResponseException, + RequestSendFailed, + SynapseError, +) from synapse.util import logcontext, unwrapFirstError from synapse.util.logcontext import ( LoggingContext, @@ -44,6 +50,7 @@ from synapse.util.logcontext import ( run_in_background, ) from synapse.util.metrics import Measure +from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -367,13 +374,18 @@ class Keyring(object): server_name_and_key_ids, perspective_name, perspective_keys ) defer.returnValue(result) + except KeyLookupError as e: + logger.warning( + "Key lookup failed from %r: %s", perspective_name, e, + ) except Exception as e: logger.exception( "Unable to get key from %r: %s %s", perspective_name, type(e).__name__, str(e), ) - defer.returnValue({}) + + defer.returnValue({}) results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ @@ -421,21 +433,30 @@ class Keyring(object): # TODO(mark): Set the minimum_valid_until_ts to that needed by # the events being validated or the current time if validating # an incoming request. - query_response = yield self.client.post_json( - destination=perspective_name, - path="/_matrix/key/v2/query", - data={ - u"server_keys": { - server_name: { - key_id: { - u"minimum_valid_until_ts": 0 - } for key_id in key_ids + try: + query_response = yield self.client.post_json( + destination=perspective_name, + path="/_matrix/key/v2/query", + data={ + u"server_keys": { + server_name: { + key_id: { + u"minimum_valid_until_ts": 0 + } for key_id in key_ids + } + for server_name, key_ids in server_names_and_key_ids } - for server_name, key_ids in server_names_and_key_ids - } - }, - long_retries=True, - ) + }, + long_retries=True, + ) + except (NotRetryingDestination, RequestSendFailed) as e: + raise_from( + KeyLookupError("Failed to connect to remote server"), e, + ) + except HttpResponseException as e: + raise_from( + KeyLookupError("Remote server returned an error"), e, + ) keys = {} @@ -502,11 +523,20 @@ class Keyring(object): if requested_key_id in keys: continue - response = yield self.client.get_json( - destination=server_name, - path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id), - ignore_backoff=True, - ) + try: + response = yield self.client.get_json( + destination=server_name, + path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id), + ignore_backoff=True, + ) + except (NotRetryingDestination, RequestSendFailed) as e: + raise_from( + KeyLookupError("Failed to connect to remote server"), e, + ) + except HttpResponseException as e: + raise_from( + KeyLookupError("Remote server returned an error"), e, + ) if (u"signatures" not in response or server_name not in response[u"signatures"]): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4e4f58b418..58e04d81ab 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,6 +33,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import ( CodeMessageException, + Codes, FederationDeniedError, HttpResponseException, SynapseError, @@ -792,10 +793,25 @@ class FederationClient(FederationBase): defer.returnValue(content) except HttpResponseException as e: if e.code in [400, 404]: + err = e.to_synapse_error() + + # If we receive an error response that isn't a generic error, we + # assume that the remote understands the v2 invite API and this + # is a legitimate error. + if err.errcode != Codes.UNKNOWN: + raise err + + # Otherwise, we assume that the remote server doesn't understand + # the v2 invite API. + if room_version in (RoomVersions.V1, RoomVersions.V2): pass # We'll fall through else: - raise Exception("Remote server is too old") + raise SynapseError( + 400, + "User's homeserver does not support this room version", + Codes.UNSUPPORTED_ROOM_VERSION, + ) elif e.code == 403: raise e.to_synapse_error() else: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3da86d4ba6..81f3b4b1ff 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -25,9 +25,10 @@ from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership from synapse.api.errors import ( AuthError, + Codes, FederationError, IncompatibleRoomVersionError, NotFoundError, @@ -239,8 +240,9 @@ class FederationServer(FederationBase): f = failure.Failure() pdu_results[event_id] = {"error": str(e)} logger.error( - "Failed to handle PDU %s: %s", - event_id, f.getTraceback().rstrip(), + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), ) yield concurrently_execute( @@ -386,6 +388,13 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_invite_request(self, origin, content, room_version): + if room_version not in KNOWN_ROOM_VERSIONS: + raise SynapseError( + 400, + "Homeserver does not support this room version", + Codes.UNSUPPORTED_ROOM_VERSION, + ) + format_ver = room_version_to_event_format(room_version) pdu = event_from_pdu_json(content, format_ver) @@ -877,6 +886,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): def on_edu(self, edu_type, origin, content): """Overrides FederationHandlerRegistry """ + if not self.config.use_presence and edu_type == "m.presence": + return + handler = self.edu_handlers.get(edu_type) if handler: return super(ReplicationFederationHandlerRegistry, self).on_edu( diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a2396ab466..ebb81be377 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -393,7 +393,7 @@ class FederationStateServlet(BaseFederationServlet): return self.handler.on_context_state_request( origin, context, - parse_string_from_args(query, "event_id", None), + parse_string_from_args(query, "event_id", None, required=True), ) @@ -404,7 +404,7 @@ class FederationStateIdsServlet(BaseFederationServlet): return self.handler.on_state_ids_request( origin, room_id, - parse_string_from_args(query, "event_id", None), + parse_string_from_args(query, "event_id", None, required=True), ) @@ -736,7 +736,8 @@ class PublicRoomList(BaseFederationServlet): data = yield self.handler.get_local_public_room_list( limit, since_token, - network_tuple=network_tuple + network_tuple=network_tuple, + from_federation=True, ) defer.returnValue((200, data)) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 633c865ed8..a7eaead56b 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -113,8 +113,7 @@ class GroupsServerHandler(object): room_id = room_entry["room_id"] joined_users = yield self.store.get_users_in_room(room_id) entry = yield self.room_list_handler.generate_room_entry( - room_id, len(joined_users), - with_alias=False, allow_private=True, + room_id, len(joined_users), with_alias=False, allow_private=True, ) entry = dict(entry) # so we don't change whats cached entry.pop("room_id", None) @@ -544,8 +543,7 @@ class GroupsServerHandler(object): joined_users = yield self.store.get_users_in_room(room_id) entry = yield self.room_list_handler.generate_room_entry( - room_id, len(joined_users), - with_alias=False, allow_private=True, + room_id, len(joined_users), with_alias=False, allow_private=True, ) if not entry: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index de839ca527..0425380e55 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -770,10 +770,26 @@ class FederationHandler(BaseHandler): set(auth_events.keys()) | set(state_events.keys()) ) + # We now have a chunk of events plus associated state and auth chain to + # persist. We do the persistence in two steps: + # 1. Auth events and state get persisted as outliers, plus the + # backward extremities get persisted (as non-outliers). + # 2. The rest of the events in the chunk get persisted one by one, as + # each one depends on the previous event for its state. + # + # The important thing is that events in the chunk get persisted as + # non-outliers, including when those events are also in the state or + # auth chain. Caution must therefore be taken to ensure that they are + # not accidentally marked as outliers. + + # Step 1a: persist auth events that *don't* appear in the chunk ev_infos = [] for a in auth_events.values(): - if a.event_id in seen_events: + # We only want to persist auth events as outliers that we haven't + # seen and aren't about to persist as part of the backfilled chunk. + if a.event_id in seen_events or a.event_id in event_map: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, @@ -785,14 +801,21 @@ class FederationHandler(BaseHandler): } }) + # Step 1b: persist the events in the chunk we fetched state for (i.e. + # the backwards extremities) as non-outliers. for e_id in events_to_state: + # For paranoia we ensure that these events are marked as + # non-outliers + ev = event_map[e_id] + assert(not ev.internal_metadata.is_outlier()) + ev_infos.append({ - "event": event_map[e_id], + "event": ev, "state": events_to_state[e_id], "auth_events": { (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] - for a_id in event_map[e_id].auth_event_ids() + for a_id in ev.auth_event_ids() if a_id in auth_events } }) @@ -802,12 +825,17 @@ class FederationHandler(BaseHandler): backfilled=True, ) + # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) for event in events: if event in events_to_state: continue + # For paranoia we ensure that these events are marked as + # non-outliers + assert(not event.internal_metadata.is_outlier()) + # We store these one at a time since each event depends on the # previous to work out the state. # TODO: We can probably do something more clever here. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3981fe69ce..c762b58902 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -436,10 +436,11 @@ class EventCreationHandler(object): if event.is_state(): prev_state = yield self.deduplicate_state_event(event, context) - logger.info( - "Not bothering to persist duplicate state event %s", event.event_id, - ) if prev_state is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, prev_state.event_id, + ) defer.returnValue(prev_state) yield self.handle_new_client_event( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 9d257ecf31..e4fdae9266 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -136,7 +136,11 @@ class PaginationHandler(object): logger.info("[purge] complete") self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE except Exception: - logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + f = Failure() + logger.error( + "[purge] failed", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED finally: self._purges_in_progress_by_room.discard(room_id) @@ -254,7 +258,7 @@ class PaginationHandler(object): }) state = None - if event_filter and event_filter.lazy_load_members(): + if event_filter and event_filter.lazy_load_members() and len(events) > 0: # TODO: remove redundant members # FIXME: we also care about invite targets etc. diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 4c2690ba26..696469732c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,8 +16,8 @@ import logging from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id -from synapse.util import logcontext from ._base import BaseHandler @@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler): if is_new: # fire off a process in the background to send the receipt to # remote servers - self._push_remotes([receipt]) + run_as_background_process( + 'push_receipts_to_remotes', self._push_remotes, receipt + ) @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): @@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) - @logcontext.preserve_fn # caller should not yield on this @defer.inlineCallbacks - def _push_remotes(self, receipts): - """Given a list of receipts, works out which remote servers should be + def _push_remotes(self, receipt): + """Given a receipt, works out which remote servers should be poked and pokes them. """ try: - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, - } + # TODO: optimise this to move some of the work to the workers. + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, } - }, + } }, - key=(room_id, receipt_type, user_id), - ) + }, + key=(room_id, receipt_type, user_id), + ) except Exception: logger.exception("Error pushing receipts to remote servers") diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 24a4cb5a83..c0e06929bd 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -460,7 +460,7 @@ class RegistrationHandler(BaseHandler): lines = response.split('\n') json = { "valid": lines[0] == 'true', - "error_url": "http://www.google.com/recaptcha/api/challenge?" + + "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" + "error=%s" % lines[1] } defer.returnValue(json) @@ -471,7 +471,7 @@ class RegistrationHandler(BaseHandler): Used only by c/s api v1 """ data = yield self.captcha_client.post_urlencoded_get_raw( - "http://www.google.com:80/recaptcha/api/verify", + "http://www.recaptcha.net:80/recaptcha/api/verify", args={ 'privatekey': private_key, 'remoteip': ip_addr, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 13e212d669..afa508d729 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -50,16 +50,17 @@ class RoomListHandler(BaseHandler): def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID, + from_federation=False): """Generate a local public room list. There are multiple different lists: the main one plus one per third party network. A client can ask for a specific list or to return all. Args: - limit (int) - since_token (str) - search_filter (dict) + limit (int|None) + since_token (str|None) + search_filter (dict|None) network_tuple (ThirdPartyInstanceID): Which public list to use. This can be (None, None) to indicate the main list, or a particular appservice and network id to use an appservice specific one. @@ -87,14 +88,30 @@ class RoomListHandler(BaseHandler): return self.response_cache.wrap( key, self._get_public_room_list, - limit, since_token, network_tuple=network_tuple, + limit, since_token, + network_tuple=network_tuple, from_federation=from_federation, ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, network_tuple=EMPTY_THIRD_PARTY_ID, + from_federation=False, timeout=None,): + """Generate a public room list. + Args: + limit (int|None): Maximum amount of rooms to return. + since_token (str|None) + search_filter (dict|None): Dictionary to filter rooms by. + network_tuple (ThirdPartyInstanceID): Which public list to use. + This can be (None, None) to indicate the main list, or a particular + appservice and network id to use an appservice specific one. + Setting to None returns all public rooms across all lists. + from_federation (bool): Whether this request originated from a + federating server or a client. Used for room filtering. + timeout (int|None): Amount of seconds to wait for a response before + timing out. + """ if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -217,7 +234,8 @@ class RoomListHandler(BaseHandler): yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], - chunk, limit, search_filter + chunk, limit, search_filter, + from_federation=from_federation, ), batch, 5, ) @@ -288,23 +306,51 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, - search_filter): + search_filter, from_federation=False): """Generate the entry for a room in the public room list and append it to the `chunk` if it matches the search filter + + Args: + room_id (str): The ID of the room. + num_joined_users (int): The number of joined users in the room. + chunk (list) + limit (int|None): Maximum amount of rooms to display. Function will + return if length of chunk is greater than limit + 1. + search_filter (dict|None) + from_federation (bool): Whether this request originated from a + federating server or a client. Used for room filtering. """ if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return result = yield self.generate_room_entry(room_id, num_joined_users) + if not result: + return + + if from_federation and not result.get("m.federate", True): + # This is a room that other servers cannot join. Do not show them + # this room. + return - if result and _matches_room_entry(result, search_filter): + if _matches_room_entry(result, search_filter): chunk.append(result) @cachedInlineCallbacks(num_args=1, cache_context=True) def generate_room_entry(self, room_id, num_joined_users, cache_context, with_alias=True, allow_private=False): """Returns the entry for a room + + Args: + room_id (str): The room's ID. + num_joined_users (int): Number of users in the room. + cache_context: Information for cached responses. + with_alias (bool): Whether to return the room's aliases in the result. + allow_private (bool): Whether invite-only rooms should be shown. + + Returns: + Deferred[dict|None]: Returns a room entry as a dictionary, or None if this + room was determined not to be shown publicly. """ result = { "room_id": room_id, @@ -318,6 +364,7 @@ class RoomListHandler(BaseHandler): event_map = yield self.store.get_events([ event_id for key, event_id in iteritems(current_state_ids) if key[0] in ( + EventTypes.Create, EventTypes.JoinRules, EventTypes.Name, EventTypes.Topic, @@ -334,12 +381,17 @@ class RoomListHandler(BaseHandler): } # Double check that this is actually a public room. + join_rules_event = current_state.get((EventTypes.JoinRules, "")) if join_rules_event: join_rule = join_rules_event.content.get("join_rule", None) if not allow_private and join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) + # Return whether this room is open to federation users or not + create_event = current_state.get((EventTypes.Create, "")) + result["m.federate"] = create_event.content.get("m.federate", True) + if with_alias: aliases = yield self.store.get_aliases_for_room( room_id, on_invalidate=cache_context.invalidate diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 384d8a37a2..1334c630cc 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -68,9 +68,13 @@ class MatrixFederationAgent(object): TLS policy to use for fetching .well-known files. None to use a default (browser-like) implementation. - srv_resolver (SrvResolver|None): + _srv_resolver (SrvResolver|None): SRVResolver impl to use for looking up SRV records. None to use a default implementation. + + _well_known_cache (TTLCache|None): + TTLCache impl for storing cached well-known lookups. None to use a default + implementation. """ def __init__( diff --git a/synapse/http/server.py b/synapse/http/server.py index 6c67a25a11..16fb7935da 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -169,18 +169,18 @@ def _return_html_error(f, request): ) else: logger.error( - "Failed handle request %r: %s", + "Failed handle request %r", request, - f.getTraceback().rstrip(), + exc_info=(f.type, f.value, f.getTracebackObject()), ) else: code = http_client.INTERNAL_SERVER_ERROR msg = "Internal server error" logger.error( - "Failed handle request %r: %s", + "Failed handle request %r", request, - f.getTraceback().rstrip(), + exc_info=(f.type, f.value, f.getTracebackObject()), ) body = HTML_ERROR_TEMPLATE.format( diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 98d8d9560b..e65f8c63d3 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -32,9 +32,25 @@ if six.PY3: logger = logging.getLogger(__name__) -http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "") +http_push_processed_counter = Counter( + "synapse_http_httppusher_http_pushes_processed", + "Number of push notifications successfully sent", +) -http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "") +http_push_failed_counter = Counter( + "synapse_http_httppusher_http_pushes_failed", + "Number of push notifications which failed", +) + +http_badges_processed_counter = Counter( + "synapse_http_httppusher_badge_updates_processed", + "Number of badge updates successfully sent", +) + +http_badges_failed_counter = Counter( + "synapse_http_httppusher_badge_updates_failed", + "Number of badge updates which failed", +) class HttpPusher(object): @@ -81,6 +97,11 @@ class HttpPusher(object): pusherdict['pushkey'], ) + if self.data is None: + raise PusherConfigException( + "data can not be null for HTTP pusher" + ) + if 'url' not in self.data: raise PusherConfigException( "'url' required in data for HTTP pusher" @@ -346,6 +367,10 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): + """ + Args: + badge (int): number of unread messages + """ logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { @@ -366,14 +391,11 @@ class HttpPusher(object): } } try: - resp = yield self.http_client.post_json_get_json(self.url, d) + yield self.http_client.post_json_get_json(self.url, d) + http_badges_processed_counter.inc() except Exception as e: logger.warning( "Failed to send badge count to %s: %s %s", self.name, type(e), e, ) - defer.returnValue(False) - rejected = [] - if 'rejected' in resp: - rejected = resp['rejected'] - defer.returnValue(rejected) + http_badges_failed_counter.inc() diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index 368d5094be..b33f2a357b 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -56,7 +56,7 @@ class PusherFactory(object): f = self.pusher_types.get(kind, None) if not f: return None - logger.info("creating %s pusher for %r", kind, pusherdict) + logger.debug("creating %s pusher for %r", kind, pusherdict) return f(self.hs, pusherdict) def _create_email_pusher(self, _hs, pusherdict): diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 5a4e73ccd6..abf1a1a9c1 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,6 +19,7 @@ import logging from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.push import PusherConfigException from synapse.push.pusher import PusherFactory logger = logging.getLogger(__name__) @@ -140,6 +141,10 @@ class PusherPool: @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): + if not self.pushers: + # nothing to do here. + return + try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id @@ -155,6 +160,10 @@ class PusherPool: @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + if not self.pushers: + # nothing to do here. + return + try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive @@ -214,6 +223,15 @@ class PusherPool: """ try: p = self.pusher_factory.create_pusher(pusherdict) + except PusherConfigException as e: + logger.warning( + "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s", + pusherdict.get('user_name'), + pusherdict.get('app_id'), + pusherdict.get('pushkey'), + e, + ) + return except Exception: logger.exception("Couldn't start a pusher: caught Exception") return diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 1353a32d00..817d1f67f9 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -59,12 +59,7 @@ class BaseSlavedStore(SQLBaseStore): members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) else: - try: - getattr(self, row.cache_func).invalidate(tuple(row.keys)) - except AttributeError: - # We probably haven't pulled in the cache in this worker, - # which is fine. - pass + self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys)) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 92447b00d4..9e530defe0 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -54,8 +54,11 @@ class SlavedPresenceStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedPresenceStore, self).stream_positions() - position = self._presence_id_gen.get_current_token() - result["presence"] = position + + if self.hs.config.use_presence: + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 586dddb40b..e558f90e1a 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): Accepts a handler that will be called when new data is available or data is required. """ - maxDelay = 5 # Try at least once every N seconds + maxDelay = 30 # Try at least once every N seconds def __init__(self, hs, client_name, handler): self.client_name = client_name @@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory): def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) - self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) @@ -90,15 +89,18 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} + # The factory used to create connections. + self.factory = None + def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name - factory = ReplicationClientFactory(hs, client_name, self) + self.factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes @@ -140,6 +142,7 @@ class ReplicationClientHandler(object): args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data + return args def get_currently_syncing_users(self): @@ -204,3 +207,14 @@ class ReplicationClientHandler(object): for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = [] + + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + logger.info("Finished connecting to server") + + # We don't reset the delay any earlier as otherwise if there is a + # problem during start up we'll end up tight looping connecting to the + # server. + self.factory.resetDelay() diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 327556f6a1..2098c32a77 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -127,8 +127,11 @@ class RdataCommand(Command): class PositionCommand(Command): - """Sent by the client to tell the client the stream postition without + """Sent by the server to tell the client the stream postition without needing to send an RDATA. + + Sent to the client after all missing updates for a stream have been sent + to the client and they're now up to date. """ NAME = "POSITION" diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 0b3fe6cbf5..49ae5b3355 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -268,7 +268,17 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): if "\n" in string: raise Exception("Unexpected newline in command: %r", string) - self.sendLine(string.encode("utf-8")) + encoded_string = string.encode("utf-8") + + if len(encoded_string) > self.MAX_LENGTH: + raise Exception( + "Failed to send command %s as too long (%d > %d)" % ( + cmd.NAME, + len(encoded_string), self.MAX_LENGTH, + ) + ) + + self.sendLine(encoded_string) self.last_sent_command = self.clock.time_msec() @@ -361,6 +371,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def id(self): return "%s-%s" % (self.name, self.conn_id) + def lineLengthExceeded(self, line): + """Called when we receive a line that is above the maximum line length + """ + self.send_error("Line length exceeded") + class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS @@ -511,6 +526,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.server_name = server_name self.handler = handler + # Set of stream names that have been subscribe to, but haven't yet + # caught up with. This is used to track when the client has been fully + # connected to the remote. + self.streams_connecting = set() + # Map of stream to batched updates. See RdataCommand for info on how # batching works. self.pending_batches = {} @@ -533,6 +553,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # We've now finished connecting to so inform the client handler self.handler.update_connection(self) + # This will happen if we don't actually subscribe to any streams + if not self.streams_connecting: + self.handler.finished_connecting() + def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) @@ -562,6 +586,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): return self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): + # When we get a `POSITION` command it means we've finished getting + # missing updates for the given stream, and are now up to date. + self.streams_connecting.discard(cmd.stream_name) + if not self.streams_connecting: + self.handler.finished_connecting() + return self.handler.on_position(cmd.stream_name, cmd.token) def on_SYNC(self, cmd): @@ -578,6 +608,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.id(), stream_name, token ) + self.streams_connecting.add(stream_name) + self.send_command(ReplicateCommand(stream_name, token)) def on_connection_closed(self): diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index f7bb710642..ac035c7735 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -33,7 +33,7 @@ RECAPTCHA_TEMPLATE = """ <title>Authentication</title> <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> -<script src="https://www.google.com/recaptcha/api.js" +<script src="https://www.recaptcha.net/recaptcha/api.js" async defer></script> <script src="//code.jquery.com/jquery-1.11.2.min.js"></script> <link rel="stylesheet" href="/_matrix/static/client/register/style.css"> diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index efe42a429d..fece1ef0b8 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -133,8 +134,15 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam logger.debug("Responding to media request with responder %s") add_file_headers(request, media_type, file_size, upload_name) - with responder: - yield responder.write_to_consumer(request) + try: + with responder: + yield responder.write_to_consumer(request) + except Exception as e: + # The majority of the time this will be due to the client having gone + # away. Unfortunately, Twisted simply throws a generic exception at us + # in that case. + logger.warning("Failed to write to consumer: %s %s", type(e), e) + finish_request(request) @@ -206,8 +214,7 @@ def get_filename_from_headers(headers): Content-Disposition HTTP header. Args: - headers (twisted.web.http_headers.Headers): The HTTP - request headers. + headers (dict[bytes, list[bytes]]): The HTTP request headers. Returns: A Unicode string of the filename, or None. @@ -218,23 +225,12 @@ def get_filename_from_headers(headers): if not content_disposition[0]: return - # dict of unicode: bytes, corresponding to the key value sections of the - # Content-Disposition header. - params = {} - parts = content_disposition[0].split(b";") - for i in parts: - # Split into key-value pairs, if able - # We don't care about things like `inline`, so throw it out - if b"=" not in i: - continue - - key, value = i.strip().split(b"=") - params[key.decode('ascii')] = value + _, params = _parse_header(content_disposition[0]) upload_name = None # First check if there is a valid UTF-8 filename - upload_name_utf8 = params.get("filename*", None) + upload_name_utf8 = params.get(b"filename*", None) if upload_name_utf8: if upload_name_utf8.lower().startswith(b"utf-8''"): upload_name_utf8 = upload_name_utf8[7:] @@ -260,12 +256,68 @@ def get_filename_from_headers(headers): # If there isn't check for an ascii name. if not upload_name: - upload_name_ascii = params.get("filename", None) + upload_name_ascii = params.get(b"filename", None) if upload_name_ascii and is_ascii(upload_name_ascii): - # Make sure there's no %-quoted bytes. If there is, reject it as - # non-valid ASCII. - if b"%" not in upload_name_ascii: - upload_name = upload_name_ascii.decode('ascii') + upload_name = upload_name_ascii.decode('ascii') # This may be None here, indicating we did not find a matching name. return upload_name + + +def _parse_header(line): + """Parse a Content-type like header. + + Cargo-culted from `cgi`, but works on bytes rather than strings. + + Args: + line (bytes): header to be parsed + + Returns: + Tuple[bytes, dict[bytes, bytes]]: + the main content-type, followed by the parameter dictionary + """ + parts = _parseparam(b';' + line) + key = next(parts) + pdict = {} + for p in parts: + i = p.find(b'=') + if i >= 0: + name = p[:i].strip().lower() + value = p[i + 1:].strip() + + # strip double-quotes + if len(value) >= 2 and value[0:1] == value[-1:] == b'"': + value = value[1:-1] + value = value.replace(b'\\\\', b'\\').replace(b'\\"', b'"') + pdict[name] = value + + return key, pdict + + +def _parseparam(s): + """Generator which splits the input on ;, respecting double-quoted sequences + + Cargo-culted from `cgi`, but works on bytes rather than strings. + + Args: + s (bytes): header to be parsed + + Returns: + Iterable[bytes]: the split input + """ + while s[:1] == b';': + s = s[1:] + + # look for the next ; + end = s.find(b';') + + # if there is an odd number of " marks between here and the next ;, skip to the + # next ; instead + while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2: + end = s.find(b';', end + 1) + + if end < 0: + end = len(s) + f = s[:end] + yield f.strip() + s = s[end:] diff --git a/synapse/server.pyi b/synapse/server.pyi index 06cd083a74..fb8df56cd5 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -7,9 +7,9 @@ import synapse.handlers.auth import synapse.handlers.deactivate_account import synapse.handlers.device import synapse.handlers.e2e_keys +import synapse.handlers.message import synapse.handlers.room import synapse.handlers.room_member -import synapse.handlers.message import synapse.handlers.set_password import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager diff --git a/synapse/static/client/register/index.html b/synapse/static/client/register/index.html index 886f2edd1f..6edc4deb03 100644 --- a/synapse/static/client/register/index.html +++ b/synapse/static/client/register/index.html @@ -4,7 +4,7 @@ <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> <link rel="stylesheet" href="style.css"> <script src="js/jquery-2.1.3.min.js"></script> -<script src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script> +<script src="https://www.recaptcha.net/recaptcha/api/js/recaptcha_ajax.js"></script> <script src="register_config.js"></script> <script src="js/register.js"></script> </head> diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3d895da43c..a0333d5309 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -30,6 +30,7 @@ from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id +from synapse.util import batch_iter from synapse.util.caches.descriptors import Cache from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.stringutils import exception_to_unicode @@ -1327,10 +1328,16 @@ class SQLBaseStore(object): """ txn.call_after(self._invalidate_state_caches, room_id, members_changed) - keys = itertools.chain([room_id], members_changed) - self._send_invalidation_to_replication( - txn, _CURRENT_STATE_CACHE_NAME, keys, - ) + # We need to be careful that the size of the `members_changed` list + # isn't so large that it causes problems sending over replication, so we + # send them in chunks. + # Max line length is 16K, and max user ID length is 255, so 50 should + # be safe. + for chunk in batch_iter(members_changed, 50): + keys = itertools.chain([room_id], chunk) + self._send_invalidation_to_replication( + txn, _CURRENT_STATE_CACHE_NAME, keys, + ) def _invalidate_state_caches(self, room_id, members_changed): """Invalidates caches that are based on the current state, but does @@ -1342,15 +1349,43 @@ class SQLBaseStore(object): changed """ for member in members_changed: - self.get_rooms_for_user_with_stream_ordering.invalidate((member,)) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", (member,), + ) for host in set(get_domain_from_id(u) for u in members_changed): - self.is_host_joined.invalidate((room_id, host)) - self.was_host_joined.invalidate((room_id, host)) + self._attempt_to_invalidate_cache( + "is_host_joined", (room_id, host,), + ) + self._attempt_to_invalidate_cache( + "was_host_joined", (room_id, host,), + ) + + self._attempt_to_invalidate_cache( + "get_users_in_room", (room_id,), + ) + self._attempt_to_invalidate_cache( + "get_room_summary", (room_id,), + ) + self._attempt_to_invalidate_cache( + "get_current_state_ids", (room_id,), + ) + + def _attempt_to_invalidate_cache(self, cache_name, key): + """Attempts to invalidate the cache of the given name, ignoring if the + cache doesn't exist. Mainly used for invalidating caches on workers, + where they may not have the cache. - self.get_users_in_room.invalidate((room_id,)) - self.get_room_summary.invalidate((room_id,)) - self.get_current_state_ids.invalidate((room_id,)) + Args: + cache_name (str) + key (tuple) + """ + try: + getattr(self, cache_name).invalidate(key) + except AttributeError: + # We probably haven't pulled in the cache in this worker, + # which is fine. + pass def _send_invalidation_to_replication(self, txn, cache_name, keys): """Notifies replication that given cache has been invalidated. @@ -1568,6 +1603,14 @@ class SQLBaseStore(object): return cls.cursor_to_dict(txn) + @property + def database_engine_name(self): + return self.database_engine.module.__name__ + + def get_server_version(self): + """Returns a string describing the server version number""" + return self.database_engine.server_version + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 4004427c7b..dc3238501c 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -23,6 +23,7 @@ class PostgresEngine(object): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) self.synchronous_commit = database_config.get("synchronous_commit", True) + self._version = None # unknown as yet def check_database(self, txn): txn.execute("SHOW SERVER_ENCODING") @@ -87,3 +88,27 @@ class PostgresEngine(object): """ txn.execute("SELECT nextval('state_group_id_seq')") return txn.fetchone()[0] + + @property + def server_version(self): + """Returns a string giving the server version. For example: '8.1.5' + + Returns: + string + """ + # note that this is a bit of a hack because it relies on on_new_connection + # having been called at least once. Still, that should be a safe bet here. + numver = self._version + assert numver is not None + + # https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION + if numver >= 100000: + return "%i.%i" % ( + numver / 10000, numver % 10000, + ) + else: + return "%i.%i.%i" % ( + numver / 10000, + (numver % 10000) / 100, + numver % 100, + ) diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 059ab81055..1bcd5b99a4 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -70,6 +70,15 @@ class Sqlite3Engine(object): self._current_state_group_id += 1 return self._current_state_group_id + @property + def server_version(self): + """Gets a string giving the server version. For example: '3.22.0' + + Returns: + string + """ + return "%i.%i.%i" % self.module.sqlite_version_info + # Following functions taken from: https://github.com/coleifer/peewee diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9b9572890b..9b6c28892c 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -295,6 +295,39 @@ class RegistrationWorkerStore(SQLBaseStore): return ret['user_id'] return None + @defer.inlineCallbacks + def user_add_threepid(self, user_id, medium, address, validated_at, added_at): + yield self._simple_upsert("user_threepids", { + "medium": medium, + "address": address, + }, { + "user_id": user_id, + "validated_at": validated_at, + "added_at": added_at, + }) + + @defer.inlineCallbacks + def user_get_threepids(self, user_id): + ret = yield self._simple_select_list( + "user_threepids", { + "user_id": user_id + }, + ['medium', 'address', 'validated_at', 'added_at'], + 'user_get_threepids' + ) + defer.returnValue(ret) + + def user_delete_threepid(self, user_id, medium, address): + return self._simple_delete( + "user_threepids", + keyvalues={ + "user_id": user_id, + "medium": medium, + "address": address, + }, + desc="user_delete_threepids", + ) + class RegistrationStore(RegistrationWorkerStore, background_updates.BackgroundUpdateStore): @@ -633,39 +666,6 @@ class RegistrationStore(RegistrationWorkerStore, defer.returnValue(res if res else False) @defer.inlineCallbacks - def user_add_threepid(self, user_id, medium, address, validated_at, added_at): - yield self._simple_upsert("user_threepids", { - "medium": medium, - "address": address, - }, { - "user_id": user_id, - "validated_at": validated_at, - "added_at": added_at, - }) - - @defer.inlineCallbacks - def user_get_threepids(self, user_id): - ret = yield self._simple_select_list( - "user_threepids", { - "user_id": user_id - }, - ['medium', 'address', 'validated_at', 'added_at'], - 'user_get_threepids' - ) - defer.returnValue(ret) - - def user_delete_threepid(self, user_id, medium, address): - return self._simple_delete( - "user_threepids", - keyvalues={ - "user_id": user_id, - "medium": medium, - "address": address, - }, - desc="user_delete_threepids", - ) - - @defer.inlineCallbacks def save_or_get_3pid_guest_access_token( self, medium, address, access_token, inviter_user_id ): |