diff --git a/synapse/__init__.py b/synapse/__init__.py
index 2004375f98..25c10244d3 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.99.1.1"
+__version__ = "0.99.2"
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 043b48f8f3..5070094cad 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -48,6 +48,7 @@ from synapse.rest.client.v1.room import (
RoomMemberListRestServlet,
RoomStateRestServlet,
)
+from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -96,6 +97,7 @@ class ClientReaderServer(HomeServer):
RoomEventContextServlet(self).register(resource)
RegisterRestServlet(self).register(resource)
LoginRestServlet(self).register(resource)
+ ThreepidRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index b116c17669..7da79dc827 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
-from synapse.api.urls import FEDERATION_PREFIX
+from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer):
),
})
+ if name in ["keys", "federation"]:
+ resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
+
root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp(
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index d5b954361d..8479fee738 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -66,10 +66,15 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
headers = {
"Authorization": auth_headers,
}
- result = yield self.http_client.get_json(
- self.main_uri + request.uri.decode('ascii'),
- headers=headers,
- )
+
+ try:
+ result = yield self.http_client.get_json(
+ self.main_uri + request.uri.decode('ascii'),
+ headers=headers,
+ )
+ except HttpResponseException as e:
+ raise e.to_synapse_error()
+
defer.returnValue((200, result))
@defer.inlineCallbacks
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 05a97979ec..e8b6cc3114 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -555,6 +555,9 @@ def run(hs):
stats["memory_rss"] += process.memory_info().rss
stats["cpu_average"] += int(process.cpu_percent(interval=None))
+ stats["database_engine"] = hs.get_datastore().database_engine_name
+ stats["database_server_version"] = hs.get_datastore().get_server_version()
+
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
yield hs.get_simple_http_client().put_json(
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index 4064891ffb..d25196be08 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -47,5 +47,5 @@ class CaptchaConfig(Config):
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
- recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
+ recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
"""
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 8d5d287357..40045de7ac 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -19,6 +19,8 @@ import warnings
from datetime import datetime
from hashlib import sha256
+import six
+
from unpaddedbase64 import encode_base64
from OpenSSL import crypto
@@ -36,9 +38,11 @@ class TlsConfig(Config):
acme_config = {}
self.acme_enabled = acme_config.get("enabled", False)
- self.acme_url = acme_config.get(
+
+ # hyperlink complains on py2 if this is not a Unicode
+ self.acme_url = six.text_type(acme_config.get(
"url", u"https://acme-v01.api.letsencrypt.org/directory"
- )
+ ))
self.acme_port = acme_config.get("port", 80)
self.acme_bind_addresses = acme_config.get("bind_addresses", ['::', '0.0.0.0'])
self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
@@ -55,7 +59,7 @@ class TlsConfig(Config):
)
if not self.tls_private_key_file:
raise ConfigError(
- "tls_certificate_path must be specified if TLS-enabled listeners are "
+ "tls_private_key_path must be specified if TLS-enabled listeners are "
"configured."
)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index cce40fdd2d..7474fd515f 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -17,6 +17,7 @@
import logging
from collections import namedtuple
+from six import raise_from
from six.moves import urllib
from signedjson.key import (
@@ -35,7 +36,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
-from synapse.api.errors import Codes, RequestSendFailed, SynapseError
+from synapse.api.errors import (
+ Codes,
+ HttpResponseException,
+ RequestSendFailed,
+ SynapseError,
+)
from synapse.util import logcontext, unwrapFirstError
from synapse.util.logcontext import (
LoggingContext,
@@ -44,6 +50,7 @@ from synapse.util.logcontext import (
run_in_background,
)
from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -367,13 +374,18 @@ class Keyring(object):
server_name_and_key_ids, perspective_name, perspective_keys
)
defer.returnValue(result)
+ except KeyLookupError as e:
+ logger.warning(
+ "Key lookup failed from %r: %s", perspective_name, e,
+ )
except Exception as e:
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e),
)
- defer.returnValue({})
+
+ defer.returnValue({})
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@@ -421,21 +433,30 @@ class Keyring(object):
# TODO(mark): Set the minimum_valid_until_ts to that needed by
# the events being validated or the current time if validating
# an incoming request.
- query_response = yield self.client.post_json(
- destination=perspective_name,
- path="/_matrix/key/v2/query",
- data={
- u"server_keys": {
- server_name: {
- key_id: {
- u"minimum_valid_until_ts": 0
- } for key_id in key_ids
+ try:
+ query_response = yield self.client.post_json(
+ destination=perspective_name,
+ path="/_matrix/key/v2/query",
+ data={
+ u"server_keys": {
+ server_name: {
+ key_id: {
+ u"minimum_valid_until_ts": 0
+ } for key_id in key_ids
+ }
+ for server_name, key_ids in server_names_and_key_ids
}
- for server_name, key_ids in server_names_and_key_ids
- }
- },
- long_retries=True,
- )
+ },
+ long_retries=True,
+ )
+ except (NotRetryingDestination, RequestSendFailed) as e:
+ raise_from(
+ KeyLookupError("Failed to connect to remote server"), e,
+ )
+ except HttpResponseException as e:
+ raise_from(
+ KeyLookupError("Remote server returned an error"), e,
+ )
keys = {}
@@ -502,11 +523,20 @@ class Keyring(object):
if requested_key_id in keys:
continue
- response = yield self.client.get_json(
- destination=server_name,
- path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
- ignore_backoff=True,
- )
+ try:
+ response = yield self.client.get_json(
+ destination=server_name,
+ path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
+ ignore_backoff=True,
+ )
+ except (NotRetryingDestination, RequestSendFailed) as e:
+ raise_from(
+ KeyLookupError("Failed to connect to remote server"), e,
+ )
+ except HttpResponseException as e:
+ raise_from(
+ KeyLookupError("Remote server returned an error"), e,
+ )
if (u"signatures" not in response
or server_name not in response[u"signatures"]):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 4e4f58b418..58e04d81ab 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import (
CodeMessageException,
+ Codes,
FederationDeniedError,
HttpResponseException,
SynapseError,
@@ -792,10 +793,25 @@ class FederationClient(FederationBase):
defer.returnValue(content)
except HttpResponseException as e:
if e.code in [400, 404]:
+ err = e.to_synapse_error()
+
+ # If we receive an error response that isn't a generic error, we
+ # assume that the remote understands the v2 invite API and this
+ # is a legitimate error.
+ if err.errcode != Codes.UNKNOWN:
+ raise err
+
+ # Otherwise, we assume that the remote server doesn't understand
+ # the v2 invite API.
+
if room_version in (RoomVersions.V1, RoomVersions.V2):
pass # We'll fall through
else:
- raise Exception("Remote server is too old")
+ raise SynapseError(
+ 400,
+ "User's homeserver does not support this room version",
+ Codes.UNSUPPORTED_ROOM_VERSION,
+ )
elif e.code == 403:
raise e.to_synapse_error()
else:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3da86d4ba6..81f3b4b1ff 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -25,9 +25,10 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.errors import (
AuthError,
+ Codes,
FederationError,
IncompatibleRoomVersionError,
NotFoundError,
@@ -239,8 +240,9 @@ class FederationServer(FederationBase):
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
- "Failed to handle PDU %s: %s",
- event_id, f.getTraceback().rstrip(),
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()),
)
yield concurrently_execute(
@@ -386,6 +388,13 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_invite_request(self, origin, content, room_version):
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ raise SynapseError(
+ 400,
+ "Homeserver does not support this room version",
+ Codes.UNSUPPORTED_ROOM_VERSION,
+ )
+
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
@@ -877,6 +886,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
+ if not self.config.use_presence and edu_type == "m.presence":
+ return
+
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a2396ab466..ebb81be377 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -393,7 +393,7 @@ class FederationStateServlet(BaseFederationServlet):
return self.handler.on_context_state_request(
origin,
context,
- parse_string_from_args(query, "event_id", None),
+ parse_string_from_args(query, "event_id", None, required=True),
)
@@ -404,7 +404,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
return self.handler.on_state_ids_request(
origin,
room_id,
- parse_string_from_args(query, "event_id", None),
+ parse_string_from_args(query, "event_id", None, required=True),
)
@@ -736,7 +736,8 @@ class PublicRoomList(BaseFederationServlet):
data = yield self.handler.get_local_public_room_list(
limit, since_token,
- network_tuple=network_tuple
+ network_tuple=network_tuple,
+ from_federation=True,
)
defer.returnValue((200, data))
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 633c865ed8..a7eaead56b 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -113,8 +113,7 @@ class GroupsServerHandler(object):
room_id = room_entry["room_id"]
joined_users = yield self.store.get_users_in_room(room_id)
entry = yield self.room_list_handler.generate_room_entry(
- room_id, len(joined_users),
- with_alias=False, allow_private=True,
+ room_id, len(joined_users), with_alias=False, allow_private=True,
)
entry = dict(entry) # so we don't change whats cached
entry.pop("room_id", None)
@@ -544,8 +543,7 @@ class GroupsServerHandler(object):
joined_users = yield self.store.get_users_in_room(room_id)
entry = yield self.room_list_handler.generate_room_entry(
- room_id, len(joined_users),
- with_alias=False, allow_private=True,
+ room_id, len(joined_users), with_alias=False, allow_private=True,
)
if not entry:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index de839ca527..0425380e55 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -770,10 +770,26 @@ class FederationHandler(BaseHandler):
set(auth_events.keys()) | set(state_events.keys())
)
+ # We now have a chunk of events plus associated state and auth chain to
+ # persist. We do the persistence in two steps:
+ # 1. Auth events and state get persisted as outliers, plus the
+ # backward extremities get persisted (as non-outliers).
+ # 2. The rest of the events in the chunk get persisted one by one, as
+ # each one depends on the previous event for its state.
+ #
+ # The important thing is that events in the chunk get persisted as
+ # non-outliers, including when those events are also in the state or
+ # auth chain. Caution must therefore be taken to ensure that they are
+ # not accidentally marked as outliers.
+
+ # Step 1a: persist auth events that *don't* appear in the chunk
ev_infos = []
for a in auth_events.values():
- if a.event_id in seen_events:
+ # We only want to persist auth events as outliers that we haven't
+ # seen and aren't about to persist as part of the backfilled chunk.
+ if a.event_id in seen_events or a.event_id in event_map:
continue
+
a.internal_metadata.outlier = True
ev_infos.append({
"event": a,
@@ -785,14 +801,21 @@ class FederationHandler(BaseHandler):
}
})
+ # Step 1b: persist the events in the chunk we fetched state for (i.e.
+ # the backwards extremities) as non-outliers.
for e_id in events_to_state:
+ # For paranoia we ensure that these events are marked as
+ # non-outliers
+ ev = event_map[e_id]
+ assert(not ev.internal_metadata.is_outlier())
+
ev_infos.append({
- "event": event_map[e_id],
+ "event": ev,
"state": events_to_state[e_id],
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
- for a_id in event_map[e_id].auth_event_ids()
+ for a_id in ev.auth_event_ids()
if a_id in auth_events
}
})
@@ -802,12 +825,17 @@ class FederationHandler(BaseHandler):
backfilled=True,
)
+ # Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
for event in events:
if event in events_to_state:
continue
+ # For paranoia we ensure that these events are marked as
+ # non-outliers
+ assert(not event.internal_metadata.is_outlier())
+
# We store these one at a time since each event depends on the
# previous to work out the state.
# TODO: We can probably do something more clever here.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..c762b58902 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -436,10 +436,11 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
- logger.info(
- "Not bothering to persist duplicate state event %s", event.event_id,
- )
if prev_state is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id, prev_state.event_id,
+ )
defer.returnValue(prev_state)
yield self.handle_new_client_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 9d257ecf31..e4fdae9266 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -136,7 +136,11 @@ class PaginationHandler(object):
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ f = Failure()
+ logger.error(
+ "[purge] failed",
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
@@ -254,7 +258,7 @@ class PaginationHandler(object):
})
state = None
- if event_filter and event_filter.lazy_load_members():
+ if event_filter and event_filter.lazy_load_members() and len(events) > 0:
# TODO: remove redundant members
# FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4c2690ba26..696469732c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,8 +16,8 @@ import logging
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
-from synapse.util import logcontext
from ._base import BaseHandler
@@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler):
if is_new:
# fire off a process in the background to send the receipt to
# remote servers
- self._push_remotes([receipt])
+ run_as_background_process(
+ 'push_receipts_to_remotes', self._push_remotes, receipt
+ )
@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
@@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
- @logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
- def _push_remotes(self, receipts):
- """Given a list of receipts, works out which remote servers should be
+ def _push_remotes(self, receipt):
+ """Given a receipt, works out which remote servers should be
poked and pokes them.
"""
try:
- # TODO: Some of this stuff should be coallesced.
- for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
- }
+ # TODO: optimise this to move some of the work to the workers.
+ room_id = receipt["room_id"]
+ receipt_type = receipt["receipt_type"]
+ user_id = receipt["user_id"]
+ event_ids = receipt["event_ids"]
+ data = receipt["data"]
+
+ users = yield self.state.get_current_user_in_room(room_id)
+ remotedomains = set(get_domain_from_id(u) for u in users)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
+
+ logger.debug("Sending receipt to: %r", remotedomains)
+
+ for domain in remotedomains:
+ self.federation.send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content={
+ room_id: {
+ receipt_type: {
+ user_id: {
+ "event_ids": event_ids,
+ "data": data,
}
- },
+ }
},
- key=(room_id, receipt_type, user_id),
- )
+ },
+ key=(room_id, receipt_type, user_id),
+ )
except Exception:
logger.exception("Error pushing receipts to remote servers")
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 24a4cb5a83..c0e06929bd 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -460,7 +460,7 @@ class RegistrationHandler(BaseHandler):
lines = response.split('\n')
json = {
"valid": lines[0] == 'true',
- "error_url": "http://www.google.com/recaptcha/api/challenge?" +
+ "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" +
"error=%s" % lines[1]
}
defer.returnValue(json)
@@ -471,7 +471,7 @@ class RegistrationHandler(BaseHandler):
Used only by c/s api v1
"""
data = yield self.captcha_client.post_urlencoded_get_raw(
- "http://www.google.com:80/recaptcha/api/verify",
+ "http://www.recaptcha.net:80/recaptcha/api/verify",
args={
'privatekey': private_key,
'remoteip': ip_addr,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 13e212d669..afa508d729 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -50,16 +50,17 @@ class RoomListHandler(BaseHandler):
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
- network_tuple=EMPTY_THIRD_PARTY_ID,):
+ network_tuple=EMPTY_THIRD_PARTY_ID,
+ from_federation=False):
"""Generate a local public room list.
There are multiple different lists: the main one plus one per third
party network. A client can ask for a specific list or to return all.
Args:
- limit (int)
- since_token (str)
- search_filter (dict)
+ limit (int|None)
+ since_token (str|None)
+ search_filter (dict|None)
network_tuple (ThirdPartyInstanceID): Which public list to use.
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
@@ -87,14 +88,30 @@ class RoomListHandler(BaseHandler):
return self.response_cache.wrap(
key,
self._get_public_room_list,
- limit, since_token, network_tuple=network_tuple,
+ limit, since_token,
+ network_tuple=network_tuple, from_federation=from_federation,
)
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
search_filter=None,
network_tuple=EMPTY_THIRD_PARTY_ID,
+ from_federation=False,
timeout=None,):
+ """Generate a public room list.
+ Args:
+ limit (int|None): Maximum amount of rooms to return.
+ since_token (str|None)
+ search_filter (dict|None): Dictionary to filter rooms by.
+ network_tuple (ThirdPartyInstanceID): Which public list to use.
+ This can be (None, None) to indicate the main list, or a particular
+ appservice and network id to use an appservice specific one.
+ Setting to None returns all public rooms across all lists.
+ from_federation (bool): Whether this request originated from a
+ federating server or a client. Used for room filtering.
+ timeout (int|None): Amount of seconds to wait for a response before
+ timing out.
+ """
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
@@ -217,7 +234,8 @@ class RoomListHandler(BaseHandler):
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
- chunk, limit, search_filter
+ chunk, limit, search_filter,
+ from_federation=from_federation,
),
batch, 5,
)
@@ -288,23 +306,51 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
- search_filter):
+ search_filter, from_federation=False):
"""Generate the entry for a room in the public room list and append it
to the `chunk` if it matches the search filter
+
+ Args:
+ room_id (str): The ID of the room.
+ num_joined_users (int): The number of joined users in the room.
+ chunk (list)
+ limit (int|None): Maximum amount of rooms to display. Function will
+ return if length of chunk is greater than limit + 1.
+ search_filter (dict|None)
+ from_federation (bool): Whether this request originated from a
+ federating server or a client. Used for room filtering.
"""
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return
result = yield self.generate_room_entry(room_id, num_joined_users)
+ if not result:
+ return
+
+ if from_federation and not result.get("m.federate", True):
+ # This is a room that other servers cannot join. Do not show them
+ # this room.
+ return
- if result and _matches_room_entry(result, search_filter):
+ if _matches_room_entry(result, search_filter):
chunk.append(result)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def generate_room_entry(self, room_id, num_joined_users, cache_context,
with_alias=True, allow_private=False):
"""Returns the entry for a room
+
+ Args:
+ room_id (str): The room's ID.
+ num_joined_users (int): Number of users in the room.
+ cache_context: Information for cached responses.
+ with_alias (bool): Whether to return the room's aliases in the result.
+ allow_private (bool): Whether invite-only rooms should be shown.
+
+ Returns:
+ Deferred[dict|None]: Returns a room entry as a dictionary, or None if this
+ room was determined not to be shown publicly.
"""
result = {
"room_id": room_id,
@@ -318,6 +364,7 @@ class RoomListHandler(BaseHandler):
event_map = yield self.store.get_events([
event_id for key, event_id in iteritems(current_state_ids)
if key[0] in (
+ EventTypes.Create,
EventTypes.JoinRules,
EventTypes.Name,
EventTypes.Topic,
@@ -334,12 +381,17 @@ class RoomListHandler(BaseHandler):
}
# Double check that this is actually a public room.
+
join_rules_event = current_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
+ # Return whether this room is open to federation users or not
+ create_event = current_state.get((EventTypes.Create, ""))
+ result["m.federate"] = create_event.content.get("m.federate", True)
+
if with_alias:
aliases = yield self.store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 384d8a37a2..1334c630cc 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -68,9 +68,13 @@ class MatrixFederationAgent(object):
TLS policy to use for fetching .well-known files. None to use a default
(browser-like) implementation.
- srv_resolver (SrvResolver|None):
+ _srv_resolver (SrvResolver|None):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
+
+ _well_known_cache (TTLCache|None):
+ TTLCache impl for storing cached well-known lookups. None to use a default
+ implementation.
"""
def __init__(
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 6c67a25a11..16fb7935da 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -169,18 +169,18 @@ def _return_html_error(f, request):
)
else:
logger.error(
- "Failed handle request %r: %s",
+ "Failed handle request %r",
request,
- f.getTraceback().rstrip(),
+ exc_info=(f.type, f.value, f.getTracebackObject()),
)
else:
code = http_client.INTERNAL_SERVER_ERROR
msg = "Internal server error"
logger.error(
- "Failed handle request %r: %s",
+ "Failed handle request %r",
request,
- f.getTraceback().rstrip(),
+ exc_info=(f.type, f.value, f.getTracebackObject()),
)
body = HTML_ERROR_TEMPLATE.format(
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 98d8d9560b..e65f8c63d3 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -32,9 +32,25 @@ if six.PY3:
logger = logging.getLogger(__name__)
-http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
+http_push_processed_counter = Counter(
+ "synapse_http_httppusher_http_pushes_processed",
+ "Number of push notifications successfully sent",
+)
-http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
+http_push_failed_counter = Counter(
+ "synapse_http_httppusher_http_pushes_failed",
+ "Number of push notifications which failed",
+)
+
+http_badges_processed_counter = Counter(
+ "synapse_http_httppusher_badge_updates_processed",
+ "Number of badge updates successfully sent",
+)
+
+http_badges_failed_counter = Counter(
+ "synapse_http_httppusher_badge_updates_failed",
+ "Number of badge updates which failed",
+)
class HttpPusher(object):
@@ -81,6 +97,11 @@ class HttpPusher(object):
pusherdict['pushkey'],
)
+ if self.data is None:
+ raise PusherConfigException(
+ "data can not be null for HTTP pusher"
+ )
+
if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
@@ -346,6 +367,10 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
+ """
+ Args:
+ badge (int): number of unread messages
+ """
logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
@@ -366,14 +391,11 @@ class HttpPusher(object):
}
}
try:
- resp = yield self.http_client.post_json_get_json(self.url, d)
+ yield self.http_client.post_json_get_json(self.url, d)
+ http_badges_processed_counter.inc()
except Exception as e:
logger.warning(
"Failed to send badge count to %s: %s %s",
self.name, type(e), e,
)
- defer.returnValue(False)
- rejected = []
- if 'rejected' in resp:
- rejected = resp['rejected']
- defer.returnValue(rejected)
+ http_badges_failed_counter.inc()
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 368d5094be..b33f2a357b 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -56,7 +56,7 @@ class PusherFactory(object):
f = self.pusher_types.get(kind, None)
if not f:
return None
- logger.info("creating %s pusher for %r", kind, pusherdict)
+ logger.debug("creating %s pusher for %r", kind, pusherdict)
return f(self.hs, pusherdict)
def _create_email_pusher(self, _hs, pusherdict):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 5a4e73ccd6..abf1a1a9c1 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,6 +19,7 @@ import logging
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
logger = logging.getLogger(__name__)
@@ -140,6 +141,10 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
+ if not self.pushers:
+ # nothing to do here.
+ return
+
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@@ -155,6 +160,10 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ if not self.pushers:
+ # nothing to do here.
+ return
+
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@@ -214,6 +223,15 @@ class PusherPool:
"""
try:
p = self.pusher_factory.create_pusher(pusherdict)
+ except PusherConfigException as e:
+ logger.warning(
+ "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
+ pusherdict.get('user_name'),
+ pusherdict.get('app_id'),
+ pusherdict.get('pushkey'),
+ e,
+ )
+ return
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
return
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 1353a32d00..817d1f67f9 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -59,12 +59,7 @@ class BaseSlavedStore(SQLBaseStore):
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
- try:
- getattr(self, row.cache_func).invalidate(tuple(row.keys))
- except AttributeError:
- # We probably haven't pulled in the cache in this worker,
- # which is fine.
- pass
+ self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 92447b00d4..9e530defe0 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -54,8 +54,11 @@ class SlavedPresenceStore(BaseSlavedStore):
def stream_positions(self):
result = super(SlavedPresenceStore, self).stream_positions()
- position = self._presence_id_gen.get_current_token()
- result["presence"] = position
+
+ if self.hs.config.use_presence:
+ position = self._presence_id_gen.get_current_token()
+ result["presence"] = position
+
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 586dddb40b..e558f90e1a 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
Accepts a handler that will be called when new data is available or data
is required.
"""
- maxDelay = 5 # Try at least once every N seconds
+ maxDelay = 30 # Try at least once every N seconds
def __init__(self, hs, client_name, handler):
self.client_name = client_name
@@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
- self.resetDelay()
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
)
@@ -90,15 +89,18 @@ class ReplicationClientHandler(object):
# Used for tests.
self.awaiting_syncs = {}
+ # The factory used to create connections.
+ self.factory = None
+
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
- factory = ReplicationClientFactory(hs, client_name, self)
+ self.factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
- hs.get_reactor().connectTCP(host, port, factory)
+ hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
@@ -140,6 +142,7 @@ class ReplicationClientHandler(object):
args["account_data"] = user_account_data
elif room_account_data:
args["account_data"] = room_account_data
+
return args
def get_currently_syncing_users(self):
@@ -204,3 +207,14 @@ class ReplicationClientHandler(object):
for cmd in self.pending_commands:
connection.send_command(cmd)
self.pending_commands = []
+
+ def finished_connecting(self):
+ """Called when we have successfully subscribed and caught up to all
+ streams we're interested in.
+ """
+ logger.info("Finished connecting to server")
+
+ # We don't reset the delay any earlier as otherwise if there is a
+ # problem during start up we'll end up tight looping connecting to the
+ # server.
+ self.factory.resetDelay()
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 327556f6a1..2098c32a77 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -127,8 +127,11 @@ class RdataCommand(Command):
class PositionCommand(Command):
- """Sent by the client to tell the client the stream postition without
+ """Sent by the server to tell the client the stream postition without
needing to send an RDATA.
+
+ Sent to the client after all missing updates for a stream have been sent
+ to the client and they're now up to date.
"""
NAME = "POSITION"
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 0b3fe6cbf5..49ae5b3355 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -268,7 +268,17 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
- self.sendLine(string.encode("utf-8"))
+ encoded_string = string.encode("utf-8")
+
+ if len(encoded_string) > self.MAX_LENGTH:
+ raise Exception(
+ "Failed to send command %s as too long (%d > %d)" % (
+ cmd.NAME,
+ len(encoded_string), self.MAX_LENGTH,
+ )
+ )
+
+ self.sendLine(encoded_string)
self.last_sent_command = self.clock.time_msec()
@@ -361,6 +371,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def id(self):
return "%s-%s" % (self.name, self.conn_id)
+ def lineLengthExceeded(self, line):
+ """Called when we receive a line that is above the maximum line length
+ """
+ self.send_error("Line length exceeded")
+
class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
@@ -511,6 +526,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.server_name = server_name
self.handler = handler
+ # Set of stream names that have been subscribe to, but haven't yet
+ # caught up with. This is used to track when the client has been fully
+ # connected to the remote.
+ self.streams_connecting = set()
+
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {}
@@ -533,6 +553,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
+ # This will happen if we don't actually subscribe to any streams
+ if not self.streams_connecting:
+ self.handler.finished_connecting()
+
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
@@ -562,6 +586,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
return self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
+ # When we get a `POSITION` command it means we've finished getting
+ # missing updates for the given stream, and are now up to date.
+ self.streams_connecting.discard(cmd.stream_name)
+ if not self.streams_connecting:
+ self.handler.finished_connecting()
+
return self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
@@ -578,6 +608,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.id(), stream_name, token
)
+ self.streams_connecting.add(stream_name)
+
self.send_command(ReplicateCommand(stream_name, token))
def on_connection_closed(self):
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index f7bb710642..ac035c7735 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -33,7 +33,7 @@ RECAPTCHA_TEMPLATE = """
<title>Authentication</title>
<meta name='viewport' content='width=device-width, initial-scale=1,
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
-<script src="https://www.google.com/recaptcha/api.js"
+<script src="https://www.recaptcha.net/recaptcha/api.js"
async defer></script>
<script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index efe42a429d..fece1ef0b8 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -133,8 +134,15 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
logger.debug("Responding to media request with responder %s")
add_file_headers(request, media_type, file_size, upload_name)
- with responder:
- yield responder.write_to_consumer(request)
+ try:
+ with responder:
+ yield responder.write_to_consumer(request)
+ except Exception as e:
+ # The majority of the time this will be due to the client having gone
+ # away. Unfortunately, Twisted simply throws a generic exception at us
+ # in that case.
+ logger.warning("Failed to write to consumer: %s %s", type(e), e)
+
finish_request(request)
@@ -206,8 +214,7 @@ def get_filename_from_headers(headers):
Content-Disposition HTTP header.
Args:
- headers (twisted.web.http_headers.Headers): The HTTP
- request headers.
+ headers (dict[bytes, list[bytes]]): The HTTP request headers.
Returns:
A Unicode string of the filename, or None.
@@ -218,23 +225,12 @@ def get_filename_from_headers(headers):
if not content_disposition[0]:
return
- # dict of unicode: bytes, corresponding to the key value sections of the
- # Content-Disposition header.
- params = {}
- parts = content_disposition[0].split(b";")
- for i in parts:
- # Split into key-value pairs, if able
- # We don't care about things like `inline`, so throw it out
- if b"=" not in i:
- continue
-
- key, value = i.strip().split(b"=")
- params[key.decode('ascii')] = value
+ _, params = _parse_header(content_disposition[0])
upload_name = None
# First check if there is a valid UTF-8 filename
- upload_name_utf8 = params.get("filename*", None)
+ upload_name_utf8 = params.get(b"filename*", None)
if upload_name_utf8:
if upload_name_utf8.lower().startswith(b"utf-8''"):
upload_name_utf8 = upload_name_utf8[7:]
@@ -260,12 +256,68 @@ def get_filename_from_headers(headers):
# If there isn't check for an ascii name.
if not upload_name:
- upload_name_ascii = params.get("filename", None)
+ upload_name_ascii = params.get(b"filename", None)
if upload_name_ascii and is_ascii(upload_name_ascii):
- # Make sure there's no %-quoted bytes. If there is, reject it as
- # non-valid ASCII.
- if b"%" not in upload_name_ascii:
- upload_name = upload_name_ascii.decode('ascii')
+ upload_name = upload_name_ascii.decode('ascii')
# This may be None here, indicating we did not find a matching name.
return upload_name
+
+
+def _parse_header(line):
+ """Parse a Content-type like header.
+
+ Cargo-culted from `cgi`, but works on bytes rather than strings.
+
+ Args:
+ line (bytes): header to be parsed
+
+ Returns:
+ Tuple[bytes, dict[bytes, bytes]]:
+ the main content-type, followed by the parameter dictionary
+ """
+ parts = _parseparam(b';' + line)
+ key = next(parts)
+ pdict = {}
+ for p in parts:
+ i = p.find(b'=')
+ if i >= 0:
+ name = p[:i].strip().lower()
+ value = p[i + 1:].strip()
+
+ # strip double-quotes
+ if len(value) >= 2 and value[0:1] == value[-1:] == b'"':
+ value = value[1:-1]
+ value = value.replace(b'\\\\', b'\\').replace(b'\\"', b'"')
+ pdict[name] = value
+
+ return key, pdict
+
+
+def _parseparam(s):
+ """Generator which splits the input on ;, respecting double-quoted sequences
+
+ Cargo-culted from `cgi`, but works on bytes rather than strings.
+
+ Args:
+ s (bytes): header to be parsed
+
+ Returns:
+ Iterable[bytes]: the split input
+ """
+ while s[:1] == b';':
+ s = s[1:]
+
+ # look for the next ;
+ end = s.find(b';')
+
+ # if there is an odd number of " marks between here and the next ;, skip to the
+ # next ; instead
+ while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2:
+ end = s.find(b';', end + 1)
+
+ if end < 0:
+ end = len(s)
+ f = s[:end]
+ yield f.strip()
+ s = s[end:]
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 06cd083a74..fb8df56cd5 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -7,9 +7,9 @@ import synapse.handlers.auth
import synapse.handlers.deactivate_account
import synapse.handlers.device
import synapse.handlers.e2e_keys
+import synapse.handlers.message
import synapse.handlers.room
import synapse.handlers.room_member
-import synapse.handlers.message
import synapse.handlers.set_password
import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager
diff --git a/synapse/static/client/register/index.html b/synapse/static/client/register/index.html
index 886f2edd1f..6edc4deb03 100644
--- a/synapse/static/client/register/index.html
+++ b/synapse/static/client/register/index.html
@@ -4,7 +4,7 @@
<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
<link rel="stylesheet" href="style.css">
<script src="js/jquery-2.1.3.min.js"></script>
-<script src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script>
+<script src="https://www.recaptcha.net/recaptcha/api/js/recaptcha_ajax.js"></script>
<script src="register_config.js"></script>
<script src="js/register.js"></script>
</head>
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3d895da43c..a0333d5309 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -30,6 +30,7 @@ from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id
+from synapse.util import batch_iter
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
@@ -1327,10 +1328,16 @@ class SQLBaseStore(object):
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
- keys = itertools.chain([room_id], members_changed)
- self._send_invalidation_to_replication(
- txn, _CURRENT_STATE_CACHE_NAME, keys,
- )
+ # We need to be careful that the size of the `members_changed` list
+ # isn't so large that it causes problems sending over replication, so we
+ # send them in chunks.
+ # Max line length is 16K, and max user ID length is 255, so 50 should
+ # be safe.
+ for chunk in batch_iter(members_changed, 50):
+ keys = itertools.chain([room_id], chunk)
+ self._send_invalidation_to_replication(
+ txn, _CURRENT_STATE_CACHE_NAME, keys,
+ )
def _invalidate_state_caches(self, room_id, members_changed):
"""Invalidates caches that are based on the current state, but does
@@ -1342,15 +1349,43 @@ class SQLBaseStore(object):
changed
"""
for member in members_changed:
- self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
+ self._attempt_to_invalidate_cache(
+ "get_rooms_for_user_with_stream_ordering", (member,),
+ )
for host in set(get_domain_from_id(u) for u in members_changed):
- self.is_host_joined.invalidate((room_id, host))
- self.was_host_joined.invalidate((room_id, host))
+ self._attempt_to_invalidate_cache(
+ "is_host_joined", (room_id, host,),
+ )
+ self._attempt_to_invalidate_cache(
+ "was_host_joined", (room_id, host,),
+ )
+
+ self._attempt_to_invalidate_cache(
+ "get_users_in_room", (room_id,),
+ )
+ self._attempt_to_invalidate_cache(
+ "get_room_summary", (room_id,),
+ )
+ self._attempt_to_invalidate_cache(
+ "get_current_state_ids", (room_id,),
+ )
+
+ def _attempt_to_invalidate_cache(self, cache_name, key):
+ """Attempts to invalidate the cache of the given name, ignoring if the
+ cache doesn't exist. Mainly used for invalidating caches on workers,
+ where they may not have the cache.
- self.get_users_in_room.invalidate((room_id,))
- self.get_room_summary.invalidate((room_id,))
- self.get_current_state_ids.invalidate((room_id,))
+ Args:
+ cache_name (str)
+ key (tuple)
+ """
+ try:
+ getattr(self, cache_name).invalidate(key)
+ except AttributeError:
+ # We probably haven't pulled in the cache in this worker,
+ # which is fine.
+ pass
def _send_invalidation_to_replication(self, txn, cache_name, keys):
"""Notifies replication that given cache has been invalidated.
@@ -1568,6 +1603,14 @@ class SQLBaseStore(object):
return cls.cursor_to_dict(txn)
+ @property
+ def database_engine_name(self):
+ return self.database_engine.module.__name__
+
+ def get_server_version(self):
+ """Returns a string describing the server version number"""
+ return self.database_engine.server_version
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 4004427c7b..dc3238501c 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -23,6 +23,7 @@ class PostgresEngine(object):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)
self.synchronous_commit = database_config.get("synchronous_commit", True)
+ self._version = None # unknown as yet
def check_database(self, txn):
txn.execute("SHOW SERVER_ENCODING")
@@ -87,3 +88,27 @@ class PostgresEngine(object):
"""
txn.execute("SELECT nextval('state_group_id_seq')")
return txn.fetchone()[0]
+
+ @property
+ def server_version(self):
+ """Returns a string giving the server version. For example: '8.1.5'
+
+ Returns:
+ string
+ """
+ # note that this is a bit of a hack because it relies on on_new_connection
+ # having been called at least once. Still, that should be a safe bet here.
+ numver = self._version
+ assert numver is not None
+
+ # https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION
+ if numver >= 100000:
+ return "%i.%i" % (
+ numver / 10000, numver % 10000,
+ )
+ else:
+ return "%i.%i.%i" % (
+ numver / 10000,
+ (numver % 10000) / 100,
+ numver % 100,
+ )
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 059ab81055..1bcd5b99a4 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -70,6 +70,15 @@ class Sqlite3Engine(object):
self._current_state_group_id += 1
return self._current_state_group_id
+ @property
+ def server_version(self):
+ """Gets a string giving the server version. For example: '3.22.0'
+
+ Returns:
+ string
+ """
+ return "%i.%i.%i" % self.module.sqlite_version_info
+
# Following functions taken from: https://github.com/coleifer/peewee
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b9572890b..9b6c28892c 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -295,6 +295,39 @@ class RegistrationWorkerStore(SQLBaseStore):
return ret['user_id']
return None
+ @defer.inlineCallbacks
+ def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
+ yield self._simple_upsert("user_threepids", {
+ "medium": medium,
+ "address": address,
+ }, {
+ "user_id": user_id,
+ "validated_at": validated_at,
+ "added_at": added_at,
+ })
+
+ @defer.inlineCallbacks
+ def user_get_threepids(self, user_id):
+ ret = yield self._simple_select_list(
+ "user_threepids", {
+ "user_id": user_id
+ },
+ ['medium', 'address', 'validated_at', 'added_at'],
+ 'user_get_threepids'
+ )
+ defer.returnValue(ret)
+
+ def user_delete_threepid(self, user_id, medium, address):
+ return self._simple_delete(
+ "user_threepids",
+ keyvalues={
+ "user_id": user_id,
+ "medium": medium,
+ "address": address,
+ },
+ desc="user_delete_threepids",
+ )
+
class RegistrationStore(RegistrationWorkerStore,
background_updates.BackgroundUpdateStore):
@@ -633,39 +666,6 @@ class RegistrationStore(RegistrationWorkerStore,
defer.returnValue(res if res else False)
@defer.inlineCallbacks
- def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
- yield self._simple_upsert("user_threepids", {
- "medium": medium,
- "address": address,
- }, {
- "user_id": user_id,
- "validated_at": validated_at,
- "added_at": added_at,
- })
-
- @defer.inlineCallbacks
- def user_get_threepids(self, user_id):
- ret = yield self._simple_select_list(
- "user_threepids", {
- "user_id": user_id
- },
- ['medium', 'address', 'validated_at', 'added_at'],
- 'user_get_threepids'
- )
- defer.returnValue(ret)
-
- def user_delete_threepid(self, user_id, medium, address):
- return self._simple_delete(
- "user_threepids",
- keyvalues={
- "user_id": user_id,
- "medium": medium,
- "address": address,
- },
- desc="user_delete_threepids",
- )
-
- @defer.inlineCallbacks
def save_or_get_3pid_guest_access_token(
self, medium, address, access_token, inviter_user_id
):
|