diff --git a/synapse/config/server.py b/synapse/config/server.py
index e763e19e15..334921d421 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -585,6 +585,22 @@ class ServerConfig(Config):
# Monthly Active User Blocking
#
+ # Used in cases where the admin or server owner wants to limit to the
+ # number of monthly active users.
+ #
+ # 'limit_usage_by_mau' disables/enables monthly active user blocking. When
+ # anabled and a limit is reached the server returns a 'ResourceLimitError'
+ # with error type Codes.RESOURCE_LIMIT_EXCEEDED
+ #
+ # 'max_mau_value' is the hard limit of monthly active users above which
+ # the server will start blocking user actions.
+ #
+ # 'mau_trial_days' is a means to add a grace period for active users. It
+ # means that users must be active for this number of days before they
+ # can be considered active and guards against the case where lots of users
+ # sign up in a short space of time never to return after their initial
+ # session.
+ #
#limit_usage_by_mau: False
#max_mau_value: 50
#mau_trial_days: 2
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 72dd5926f9..94a53d05f9 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -107,7 +107,7 @@ class TlsConfig(Config):
certs = []
for ca_file in custom_ca_list:
logger.debug("Reading custom CA certificate file: %s", ca_file)
- content = self.read_file(ca_file)
+ content = self.read_file(ca_file, "federation_custom_ca_list")
# Parse the CA certificates
try:
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index e94e71bdad..2b6b5913bc 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -60,9 +60,9 @@ logger = logging.getLogger(__name__)
@attr.s(slots=True, cmp=False)
-class VerifyKeyRequest(object):
+class VerifyJsonRequest(object):
"""
- A request for a verify key to verify a JSON object.
+ A request to verify a JSON object.
Attributes:
server_name(str): The name of the server to verify against.
@@ -85,11 +85,15 @@ class VerifyKeyRequest(object):
"""
server_name = attr.ib()
- key_ids = attr.ib()
json_object = attr.ib()
minimum_valid_until_ts = attr.ib()
+ request_name = attr.ib()
+ key_ids = attr.ib(init=False)
key_ready = attr.ib(default=attr.Factory(defer.Deferred))
+ def __attrs_post_init__(self):
+ self.key_ids = signature_ids(self.json_object, self.server_name)
+
class KeyLookupError(ValueError):
pass
@@ -114,7 +118,9 @@ class Keyring(object):
# These are regular, logcontext-agnostic Deferreds.
self.key_downloads = {}
- def verify_json_for_server(self, server_name, json_object, validity_time):
+ def verify_json_for_server(
+ self, server_name, json_object, validity_time, request_name
+ ):
"""Verify that a JSON object has been signed by a given server
Args:
@@ -125,24 +131,31 @@ class Keyring(object):
validity_time (int): timestamp at which we require the signing key to
be valid. (0 implies we don't care)
+ request_name (str): an identifier for this json object (eg, an event id)
+ for logging.
+
Returns:
Deferred[None]: completes if the the object was correctly signed, otherwise
errbacks with an error
"""
- req = server_name, json_object, validity_time
-
- return logcontext.make_deferred_yieldable(
- self.verify_json_objects_for_server((req,))[0]
- )
+ req = VerifyJsonRequest(server_name, json_object, validity_time, request_name)
+ requests = (req,)
+ return logcontext.make_deferred_yieldable(self._verify_objects(requests)[0])
def verify_json_objects_for_server(self, server_and_json):
"""Bulk verifies signatures of json objects, bulk fetching keys as
necessary.
Args:
- server_and_json (iterable[Tuple[str, dict, int]):
- Iterable of triplets of (server_name, json_object, validity_time)
- validity_time is a timestamp at which the signing key must be valid.
+ server_and_json (iterable[Tuple[str, dict, int, str]):
+ Iterable of (server_name, json_object, validity_time, request_name)
+ tuples.
+
+ validity_time is a timestamp at which the signing key must be
+ valid.
+
+ request_name is an identifier for this json object (eg, an event id)
+ for logging.
Returns:
List<Deferred[None]>: for each input triplet, a deferred indicating success
@@ -150,38 +163,54 @@ class Keyring(object):
server_name. The deferreds run their callbacks in the sentinel
logcontext.
"""
- # a list of VerifyKeyRequests
- verify_requests = []
+ return self._verify_objects(
+ VerifyJsonRequest(server_name, json_object, validity_time, request_name)
+ for server_name, json_object, validity_time, request_name in server_and_json
+ )
+
+ def _verify_objects(self, verify_requests):
+ """Does the work of verify_json_[objects_]for_server
+
+
+ Args:
+ verify_requests (iterable[VerifyJsonRequest]):
+ Iterable of verification requests.
+
+ Returns:
+ List<Deferred[None]>: for each input item, a deferred indicating success
+ or failure to verify each json object's signature for the given
+ server_name. The deferreds run their callbacks in the sentinel
+ logcontext.
+ """
+ # a list of VerifyJsonRequests which are awaiting a key lookup
+ key_lookups = []
handle = preserve_fn(_handle_key_deferred)
- def process(server_name, json_object, validity_time):
+ def process(verify_request):
"""Process an entry in the request list
- Given a (server_name, json_object, validity_time) triplet from the request
- list, adds a key request to verify_requests, and returns a deferred which
+ Adds a key request to key_lookups, and returns a deferred which
will complete or fail (in the sentinel context) when verification completes.
"""
- key_ids = signature_ids(json_object, server_name)
-
- if not key_ids:
+ if not verify_request.key_ids:
return defer.fail(
SynapseError(
- 400, "Not signed by %s" % (server_name,), Codes.UNAUTHORIZED
+ 400,
+ "Not signed by %s" % (verify_request.server_name,),
+ Codes.UNAUTHORIZED,
)
)
logger.debug(
- "Verifying for %s with key_ids %s, min_validity %i",
- server_name,
- key_ids,
- validity_time,
+ "Verifying %s for %s with key_ids %s, min_validity %i",
+ verify_request.request_name,
+ verify_request.server_name,
+ verify_request.key_ids,
+ verify_request.minimum_valid_until_ts,
)
# add the key request to the queue, but don't start it off yet.
- verify_request = VerifyKeyRequest(
- server_name, key_ids, json_object, validity_time
- )
- verify_requests.append(verify_request)
+ key_lookups.append(verify_request)
# now run _handle_key_deferred, which will wait for the key request
# to complete and then do the verification.
@@ -190,13 +219,10 @@ class Keyring(object):
# wrap it with preserve_fn (aka run_in_background)
return handle(verify_request)
- results = [
- process(server_name, json_object, validity_time)
- for server_name, json_object, validity_time in server_and_json
- ]
+ results = [process(r) for r in verify_requests]
- if verify_requests:
- run_in_background(self._start_key_lookups, verify_requests)
+ if key_lookups:
+ run_in_background(self._start_key_lookups, key_lookups)
return results
@@ -207,7 +233,7 @@ class Keyring(object):
Once each fetch completes, verify_request.key_ready will be resolved.
Args:
- verify_requests (List[VerifyKeyRequest]):
+ verify_requests (List[VerifyJsonRequest]):
"""
try:
@@ -308,7 +334,7 @@ class Keyring(object):
with a SynapseError if none of the keys are found.
Args:
- verify_requests (list[VerifyKeyRequest]): list of verify requests
+ verify_requests (list[VerifyJsonRequest]): list of verify requests
"""
remaining_requests = set(
@@ -357,7 +383,7 @@ class Keyring(object):
Args:
fetcher (KeyFetcher): fetcher to use to fetch the keys
- remaining_requests (set[VerifyKeyRequest]): outstanding key requests.
+ remaining_requests (set[VerifyJsonRequest]): outstanding key requests.
Any successfully-completed requests will be removed from the list.
"""
# dict[str, dict[str, int]]: keys to fetch.
@@ -376,7 +402,7 @@ class Keyring(object):
# the requests.
keys_for_server[key_id] = max(
keys_for_server.get(key_id, -1),
- verify_request.minimum_valid_until_ts
+ verify_request.minimum_valid_until_ts,
)
results = yield fetcher.get_keys(missing_keys)
@@ -386,7 +412,7 @@ class Keyring(object):
server_name = verify_request.server_name
# see if any of the keys we got this time are sufficient to
- # complete this VerifyKeyRequest.
+ # complete this VerifyJsonRequest.
result_keys = results.get(server_name, {})
for key_id in verify_request.key_ids:
fetch_key_result = result_keys.get(key_id)
@@ -454,9 +480,7 @@ class BaseV2KeyFetcher(object):
self.config = hs.get_config()
@defer.inlineCallbacks
- def process_v2_response(
- self, from_server, response_json, time_added_ms
- ):
+ def process_v2_response(self, from_server, response_json, time_added_ms):
"""Parse a 'Server Keys' structure from the result of a /key request
This is used to parse either the entirety of the response from
@@ -852,7 +876,7 @@ def _handle_key_deferred(verify_request):
"""Waits for the key to become available, and then performs a verification
Args:
- verify_request (VerifyKeyRequest):
+ verify_request (VerifyJsonRequest):
Returns:
Deferred[None]
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index b541913d82..fc5cfb7d83 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -271,6 +271,7 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
p.sender_domain,
p.redacted_pdu_json,
p.pdu.origin_server_ts if v.enforce_key_validity else 0,
+ p.pdu.event_id,
)
for p in pdus_to_check_sender
]
@@ -306,6 +307,7 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
get_domain_from_id(p.pdu.event_id),
p.redacted_pdu_json,
p.pdu.origin_server_ts if v.enforce_key_validity else 0,
+ p.pdu.event_id,
)
for p in pdus_to_check_event_id
]
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 0db8858cf1..949a5fb2aa 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -140,7 +140,9 @@ class Authenticator(object):
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
)
- yield self.keyring.verify_json_for_server(origin, json_request, now)
+ yield self.keyring.verify_json_for_server(
+ origin, json_request, now, "Incoming request"
+ )
logger.info("Request from %s", origin)
request.authenticated_entity = origin
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index fa6b641ee1..e5dda1975f 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -101,7 +101,9 @@ class GroupAttestationSigning(object):
if valid_until_ms < now:
raise SynapseError(400, "Attestation expired")
- yield self.keyring.verify_json_for_server(server_name, attestation, now)
+ yield self.keyring.verify_json_for_server(
+ server_name, attestation, now, "Group attestation"
+ )
def create_attestation(self, group_id, user_id):
"""Create an attestation for the group_id and user_id with default
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cf4fad7de0..ac5ca79143 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
FederationError,
+ RequestSendFailed,
StoreError,
SynapseError,
)
@@ -2027,9 +2028,21 @@ class FederationHandler(BaseHandler):
"""
room_version = yield self.store.get_room_version(event.room_id)
- yield self._update_auth_events_and_context_for_auth(
- origin, event, context, auth_events
- )
+ try:
+ yield self._update_auth_events_and_context_for_auth(
+ origin, event, context, auth_events
+ )
+ except Exception:
+ # We don't really mind if the above fails, so lets not fail
+ # processing if it does. However, it really shouldn't fail so
+ # let's still log as an exception since we'll still want to fix
+ # any bugs.
+ logger.exception(
+ "Failed to double check auth events for %s with remote. "
+ "Ignoring failure and continuing processing of event.",
+ event.event_id,
+ )
+
try:
self.auth.check(room_version, event, auth_events=auth_events)
except AuthError as e:
@@ -2042,6 +2055,15 @@ class FederationHandler(BaseHandler):
):
"""Helper for do_auth. See there for docs.
+ Checks whether a given event has the expected auth events. If it
+ doesn't then we talk to the remote server to compare state to see if
+ we can come to a consensus (e.g. if one server missed some valid
+ state).
+
+ This attempts to resovle any potential divergence of state between
+ servers, but is not essential and so failures should not block further
+ processing of the event.
+
Args:
origin (str):
event (synapse.events.EventBase):
@@ -2088,9 +2110,15 @@ class FederationHandler(BaseHandler):
missing_auth,
)
try:
- remote_auth_chain = yield self.federation_client.get_event_auth(
- origin, event.room_id, event.event_id
- )
+ try:
+ remote_auth_chain = yield self.federation_client.get_event_auth(
+ origin, event.room_id, event.event_id
+ )
+ except RequestSendFailed as e:
+ # The other side isn't around or doesn't implement the
+ # endpoint, so lets just bail out.
+ logger.info("Failed to get event auth from remote: %s", e)
+ return
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
@@ -2236,12 +2264,18 @@ class FederationHandler(BaseHandler):
try:
# 2. Get remote difference.
- result = yield self.federation_client.query_auth(
- origin,
- event.room_id,
- event.event_id,
- local_auth_chain,
- )
+ try:
+ result = yield self.federation_client.query_auth(
+ origin,
+ event.room_id,
+ event.event_id,
+ local_auth_chain,
+ )
+ except RequestSendFailed as e:
+ # The other side isn't around or doesn't implement the
+ # endpoint, so lets just bail out.
+ logger.info("Failed to query auth from remote: %s", e)
+ return
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6209858bbb..e49c8203ef 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -828,14 +828,17 @@ class PresenceHandler(object):
# joins.
continue
- event = yield self.store.get_event(event_id)
- if event.content.get("membership") != Membership.JOIN:
+ event = yield self.store.get_event(event_id, allow_none=True)
+ if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
if prev_event_id:
- prev_event = yield self.store.get_event(prev_event_id)
- if prev_event.content.get("membership") == Membership.JOIN:
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+ if (
+ prev_event
+ and prev_event.content.get("membership") == Membership.JOIN
+ ):
# Ignore changes to join events.
continue
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 0e92b405ba..7ad16c8566 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -115,6 +115,7 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
+ stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
@@ -136,10 +137,15 @@ class StatsHandler(StateDeltasHandler):
event_content = {}
if event_id is not None:
- event_content = (yield self.store.get_event(event_id)).content or {}
+ event = yield self.store.get_event(event_id, allow_none=True)
+ if event:
+ event_content = event.content or {}
+
+ # We use stream_pos here rather than fetch by event_id as event_id
+ # may be None
+ now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# quantise time to the nearest bucket
- now = yield self.store.get_received_ts(event_id)
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
if typ == EventTypes.Member:
@@ -149,9 +155,11 @@ class StatsHandler(StateDeltasHandler):
# compare them.
prev_event_content = {}
if prev_event_id is not None:
- prev_event_content = (
- yield self.store.get_event(prev_event_id)
- ).content
+ prev_event = yield self.store.get_event(
+ prev_event_id, allow_none=True,
+ )
+ if prev_event:
+ prev_event_content = prev_event.content
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 27e7cbf3cc..babbf6a23c 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -39,6 +39,7 @@ class VersionsRestServlet(RestServlet):
"r0.2.0",
"r0.3.0",
"r0.4.0",
+ "r0.5.0",
],
# as per MSC1497:
"unstable_features": {
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 1782428048..cc7df5cf14 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -78,6 +78,43 @@ class EventsWorkerStore(SQLBaseStore):
desc="get_received_ts",
)
+ def get_received_ts_by_stream_pos(self, stream_ordering):
+ """Given a stream ordering get an approximate timestamp of when it
+ happened.
+
+ This is done by simply taking the received ts of the first event that
+ has a stream ordering greater than or equal to the given stream pos.
+ If none exists returns the current time, on the assumption that it must
+ have happened recently.
+
+ Args:
+ stream_ordering (int)
+
+ Returns:
+ Deferred[int]
+ """
+
+ def _get_approximate_received_ts_txn(txn):
+ sql = """
+ SELECT received_ts FROM events
+ WHERE stream_ordering >= ?
+ LIMIT 1
+ """
+
+ txn.execute(sql, (stream_ordering,))
+ row = txn.fetchone()
+ if row and row[0]:
+ ts = row[0]
+ else:
+ ts = self.clock.time_msec()
+
+ return ts
+
+ return self.runInteraction(
+ "get_approximate_received_ts",
+ _get_approximate_received_ts_txn,
+ )
+
@defer.inlineCallbacks
def get_event(
self,
|