diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index aa5d89a9ac..7f8ddc99c6 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
defer.returnValue(params)
@defer.inlineCallbacks
- def check_auth(self, flows, clientdict, clientip):
+ def check_auth(self, flows, clientdict, clientip, password_servlet=False):
"""
Takes a dictionary sent by the client in the login / registration
protocol and handles the User-Interactive Auth flow.
@@ -186,6 +186,16 @@ class AuthHandler(BaseHandler):
clientip (str): The IP address of the client.
+ password_servlet (bool): Whether the request originated from
+ PasswordRestServlet.
+ XXX: This is a temporary hack to distinguish between checking
+ for threepid validations locally (in the case of password
+ resets) and using the identity server (in the case of binding
+ a 3PID during registration). Once we start using the
+ homeserver for both tasks, this distinction will no longer be
+ necessary.
+
+
Returns:
defer.Deferred[dict, dict, str]: a deferred tuple of
(creds, params, session_id).
@@ -241,7 +251,9 @@ class AuthHandler(BaseHandler):
if 'type' in authdict:
login_type = authdict['type']
try:
- result = yield self._check_auth_dict(authdict, clientip)
+ result = yield self._check_auth_dict(
+ authdict, clientip, password_servlet=password_servlet,
+ )
if result:
creds[login_type] = result
self._save_session(session)
@@ -351,7 +363,7 @@ class AuthHandler(BaseHandler):
return sess.setdefault('serverdict', {}).get(key, default)
@defer.inlineCallbacks
- def _check_auth_dict(self, authdict, clientip):
+ def _check_auth_dict(self, authdict, clientip, password_servlet=False):
"""Attempt to validate the auth dict provided by a client
Args:
@@ -369,7 +381,13 @@ class AuthHandler(BaseHandler):
login_type = authdict['type']
checker = self.checkers.get(login_type)
if checker is not None:
- res = yield checker(authdict, clientip)
+ # XXX: Temporary workaround for having Synapse handle password resets
+ # See AuthHandler.check_auth for further details
+ res = yield checker(
+ authdict,
+ clientip=clientip,
+ password_servlet=password_servlet,
+ )
defer.returnValue(res)
# build a v1-login-style dict out of the authdict and fall back to the
@@ -383,7 +401,7 @@ class AuthHandler(BaseHandler):
defer.returnValue(canonical_id)
@defer.inlineCallbacks
- def _check_recaptcha(self, authdict, clientip):
+ def _check_recaptcha(self, authdict, clientip, **kwargs):
try:
user_response = authdict["response"]
except KeyError:
@@ -429,20 +447,20 @@ class AuthHandler(BaseHandler):
defer.returnValue(True)
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
- def _check_email_identity(self, authdict, _):
- return self._check_threepid('email', authdict)
+ def _check_email_identity(self, authdict, **kwargs):
+ return self._check_threepid('email', authdict, **kwargs)
- def _check_msisdn(self, authdict, _):
+ def _check_msisdn(self, authdict, **kwargs):
return self._check_threepid('msisdn', authdict)
- def _check_dummy_auth(self, authdict, _):
+ def _check_dummy_auth(self, authdict, **kwargs):
return defer.succeed(True)
- def _check_terms_auth(self, authdict, _):
+ def _check_terms_auth(self, authdict, **kwargs):
return defer.succeed(True)
@defer.inlineCallbacks
- def _check_threepid(self, medium, authdict):
+ def _check_threepid(self, medium, authdict, password_servlet=False, **kwargs):
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@@ -451,7 +469,29 @@ class AuthHandler(BaseHandler):
identity_handler = self.hs.get_handlers().identity_handler
logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
- threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+ if (
+ not password_servlet
+ or self.hs.config.email_password_reset_behaviour == "remote"
+ ):
+ threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+ elif self.hs.config.email_password_reset_behaviour == "local":
+ row = yield self.store.get_threepid_validation_session(
+ medium,
+ threepid_creds["client_secret"],
+ sid=threepid_creds["sid"],
+ )
+
+ threepid = {
+ "medium": row["medium"],
+ "address": row["address"],
+ "validated_at": row["validated_at"],
+ } if row else None
+
+ if row:
+ # Valid threepid returned, delete from the db
+ yield self.store.delete_threepid_session(threepid_creds["sid"])
+ else:
+ raise SynapseError(400, "Password resets are not enabled on this homeserver")
if not threepid:
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 6003ad9cca..eb525070cf 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -122,6 +122,9 @@ class EventStreamHandler(BaseHandler):
chunks = yield self._event_serializer.serialize_events(
events, time_now, as_client_event=as_client_event,
+ # We don't bundle "live" events, as otherwise clients
+ # will end up double counting annotations.
+ bundle_aggregations=False,
)
chunk = {
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0684778882..ac5ca79143 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
FederationError,
+ RequestSendFailed,
StoreError,
SynapseError,
)
@@ -1916,6 +1917,11 @@ class FederationHandler(BaseHandler):
event.room_id, latest_event_ids=extrem_ids,
)
+ logger.debug(
+ "Doing soft-fail check for %s: state %s",
+ event.event_id, current_state_ids,
+ )
+
# Now check if event pass auth against said current state
auth_types = auth_types_for_event(event)
current_state_ids = [
@@ -1932,7 +1938,7 @@ class FederationHandler(BaseHandler):
self.auth.check(room_version, event, auth_events=current_auth_events)
except AuthError as e:
logger.warn(
- "Failed current state auth resolution for %r because %s",
+ "Soft-failing %r because %s",
event, e,
)
event.internal_metadata.soft_failed = True
@@ -2008,15 +2014,65 @@ class FederationHandler(BaseHandler):
Args:
origin (str):
- event (synapse.events.FrozenEvent):
+ event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext):
- auth_events (dict[(str, str)->str]):
+ auth_events (dict[(str, str)->synapse.events.EventBase]):
+ Map from (event_type, state_key) to event
+
+ What we expect the event's auth_events to be, based on the event's
+ position in the dag. I think? maybe??
+
+ Also NB that this function adds entries to it.
+ Returns:
+ defer.Deferred[None]
+ """
+ room_version = yield self.store.get_room_version(event.room_id)
+
+ try:
+ yield self._update_auth_events_and_context_for_auth(
+ origin, event, context, auth_events
+ )
+ except Exception:
+ # We don't really mind if the above fails, so lets not fail
+ # processing if it does. However, it really shouldn't fail so
+ # let's still log as an exception since we'll still want to fix
+ # any bugs.
+ logger.exception(
+ "Failed to double check auth events for %s with remote. "
+ "Ignoring failure and continuing processing of event.",
+ event.event_id,
+ )
+
+ try:
+ self.auth.check(room_version, event, auth_events=auth_events)
+ except AuthError as e:
+ logger.warn("Failed auth resolution for %r because %s", event, e)
+ raise e
+
+ @defer.inlineCallbacks
+ def _update_auth_events_and_context_for_auth(
+ self, origin, event, context, auth_events
+ ):
+ """Helper for do_auth. See there for docs.
+
+ Checks whether a given event has the expected auth events. If it
+ doesn't then we talk to the remote server to compare state to see if
+ we can come to a consensus (e.g. if one server missed some valid
+ state).
+
+ This attempts to resovle any potential divergence of state between
+ servers, but is not essential and so failures should not block further
+ processing of the event.
+
+ Args:
+ origin (str):
+ event (synapse.events.EventBase):
+ context (synapse.events.snapshot.EventContext):
+ auth_events (dict[(str, str)->synapse.events.EventBase]):
Returns:
defer.Deferred[None]
"""
- # Check if we have all the auth events.
- current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(event.auth_event_ids())
if event.is_state():
@@ -2024,11 +2080,21 @@ class FederationHandler(BaseHandler):
else:
event_key = None
- if event_auth_events - current_state:
+ # if the event's auth_events refers to events which are not in our
+ # calculated auth_events, we need to fetch those events from somewhere.
+ #
+ # we start by fetching them from the store, and then try calling /event_auth/.
+ missing_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
+
+ if missing_auth:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(
- event_auth_events - current_state
+ missing_auth
)
+ logger.debug("Got events %s from store", have_events)
+ missing_auth.difference_update(have_events.keys())
else:
have_events = {}
@@ -2037,17 +2103,22 @@ class FederationHandler(BaseHandler):
for e in auth_events.values()
})
- seen_events = set(have_events.keys())
-
- missing_auth = event_auth_events - seen_events - current_state
-
if missing_auth:
- logger.info("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
+ logger.info(
+ "auth_events contains unknown events: %s",
+ missing_auth,
+ )
try:
- remote_auth_chain = yield self.federation_client.get_event_auth(
- origin, event.room_id, event.event_id
- )
+ try:
+ remote_auth_chain = yield self.federation_client.get_event_auth(
+ origin, event.room_id, event.event_id
+ )
+ except RequestSendFailed as e:
+ # The other side isn't around or doesn't implement the
+ # endpoint, so lets just bail out.
+ logger.info("Failed to get event auth from remote: %s", e)
+ return
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
@@ -2084,145 +2155,174 @@ class FederationHandler(BaseHandler):
have_events = yield self.store.get_seen_events_with_rejections(
event.auth_event_ids()
)
- seen_events = set(have_events.keys())
except Exception:
# FIXME:
logger.exception("Failed to get auth chain")
+ if event.internal_metadata.is_outlier():
+ logger.info("Skipping auth_event fetch for outlier")
+ return
+
# FIXME: Assumes we have and stored all the state for all the
# prev_events
- current_state = set(e.event_id for e in auth_events.values())
- different_auth = event_auth_events - current_state
+ different_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
- room_version = yield self.store.get_room_version(event.room_id)
+ if not different_auth:
+ return
- if different_auth and not event.internal_metadata.is_outlier():
- # Do auth conflict res.
- logger.info("Different auth: %s", different_auth)
-
- different_events = yield logcontext.make_deferred_yieldable(
- defer.gatherResults([
- logcontext.run_in_background(
- self.store.get_event,
- d,
- allow_none=True,
- allow_rejected=False,
- )
- for d in different_auth
- if d in have_events and not have_events[d]
- ], consumeErrors=True)
- ).addErrback(unwrapFirstError)
-
- if different_events:
- local_view = dict(auth_events)
- remote_view = dict(auth_events)
- remote_view.update({
- (d.type, d.state_key): d for d in different_events if d
- })
+ logger.info(
+ "auth_events refers to events which are not in our calculated auth "
+ "chain: %s",
+ different_auth,
+ )
+
+ room_version = yield self.store.get_room_version(event.room_id)
- new_state = yield self.state_handler.resolve_events(
- room_version,
- [list(local_view.values()), list(remote_view.values())],
- event
+ different_events = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults([
+ logcontext.run_in_background(
+ self.store.get_event,
+ d,
+ allow_none=True,
+ allow_rejected=False,
)
+ for d in different_auth
+ if d in have_events and not have_events[d]
+ ], consumeErrors=True)
+ ).addErrback(unwrapFirstError)
+
+ if different_events:
+ local_view = dict(auth_events)
+ remote_view = dict(auth_events)
+ remote_view.update({
+ (d.type, d.state_key): d for d in different_events if d
+ })
- auth_events.update(new_state)
+ new_state = yield self.state_handler.resolve_events(
+ room_version,
+ [list(local_view.values()), list(remote_view.values())],
+ event
+ )
- current_state = set(e.event_id for e in auth_events.values())
- different_auth = event_auth_events - current_state
+ logger.info(
+ "After state res: updating auth_events with new state %s",
+ {
+ (d.type, d.state_key): d.event_id for d in new_state.values()
+ if auth_events.get((d.type, d.state_key)) != d
+ },
+ )
- yield self._update_context_for_auth_events(
- event, context, auth_events, event_key,
- )
+ auth_events.update(new_state)
- if different_auth and not event.internal_metadata.is_outlier():
- logger.info("Different auth after resolution: %s", different_auth)
+ different_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
- # Only do auth resolution if we have something new to say.
- # We can't rove an auth failure.
- do_resolution = False
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
- provable = [
- RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
- ]
+ if not different_auth:
+ # we're done
+ return
- for e_id in different_auth:
- if e_id in have_events:
- if have_events[e_id] in provable:
- do_resolution = True
- break
+ logger.info(
+ "auth_events still refers to events which are not in the calculated auth "
+ "chain after state resolution: %s",
+ different_auth,
+ )
- if do_resolution:
- prev_state_ids = yield context.get_prev_state_ids(self.store)
- # 1. Get what we think is the auth chain.
- auth_ids = yield self.auth.compute_auth_events(
- event, prev_state_ids
- )
- local_auth_chain = yield self.store.get_auth_chain(
- auth_ids, include_given=True
- )
+ # Only do auth resolution if we have something new to say.
+ # We can't prove an auth failure.
+ do_resolution = False
- try:
- # 2. Get remote difference.
- result = yield self.federation_client.query_auth(
- origin,
- event.room_id,
- event.event_id,
- local_auth_chain,
- )
+ for e_id in different_auth:
+ if e_id in have_events:
+ if have_events[e_id] == RejectedReason.NOT_ANCESTOR:
+ do_resolution = True
+ break
- seen_remotes = yield self.store.have_seen_events(
- [e.event_id for e in result["auth_chain"]]
- )
+ if not do_resolution:
+ logger.info(
+ "Skipping auth resolution due to lack of provable rejection reasons"
+ )
+ return
- # 3. Process any remote auth chain events we haven't seen.
- for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes:
- continue
+ logger.info("Doing auth resolution")
- if ev.event_id == event.event_id:
- continue
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
- try:
- auth_ids = ev.auth_event_ids()
- auth = {
- (e.type, e.state_key): e
- for e in result["auth_chain"]
- if e.event_id in auth_ids
- or event.type == EventTypes.Create
- }
- ev.internal_metadata.outlier = True
+ # 1. Get what we think is the auth chain.
+ auth_ids = yield self.auth.compute_auth_events(
+ event, prev_state_ids
+ )
+ local_auth_chain = yield self.store.get_auth_chain(
+ auth_ids, include_given=True
+ )
- logger.debug(
- "do_auth %s different_auth: %s",
- event.event_id, e.event_id
- )
+ try:
+ # 2. Get remote difference.
+ try:
+ result = yield self.federation_client.query_auth(
+ origin,
+ event.room_id,
+ event.event_id,
+ local_auth_chain,
+ )
+ except RequestSendFailed as e:
+ # The other side isn't around or doesn't implement the
+ # endpoint, so lets just bail out.
+ logger.info("Failed to query auth from remote: %s", e)
+ return
+
+ seen_remotes = yield self.store.have_seen_events(
+ [e.event_id for e in result["auth_chain"]]
+ )
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
+ # 3. Process any remote auth chain events we haven't seen.
+ for ev in result["auth_chain"]:
+ if ev.event_id in seen_remotes:
+ continue
- if ev.event_id in event_auth_events:
- auth_events[(ev.type, ev.state_key)] = ev
- except AuthError:
- pass
+ if ev.event_id == event.event_id:
+ continue
- except Exception:
- # FIXME:
- logger.exception("Failed to query auth chain")
+ try:
+ auth_ids = ev.auth_event_ids()
+ auth = {
+ (e.type, e.state_key): e
+ for e in result["auth_chain"]
+ if e.event_id in auth_ids
+ or event.type == EventTypes.Create
+ }
+ ev.internal_metadata.outlier = True
+
+ logger.debug(
+ "do_auth %s different_auth: %s",
+ event.event_id, e.event_id
+ )
- # 4. Look at rejects and their proofs.
- # TODO.
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
- yield self._update_context_for_auth_events(
- event, context, auth_events, event_key,
- )
+ if ev.event_id in event_auth_events:
+ auth_events[(ev.type, ev.state_key)] = ev
+ except AuthError:
+ pass
- try:
- self.auth.check(room_version, event, auth_events=auth_events)
- except AuthError as e:
- logger.warn("Failed auth resolution for %r because %s", event, e)
- raise e
+ except Exception:
+ # FIXME:
+ logger.exception("Failed to query auth chain")
+
+ # 4. Look at rejects and their proofs.
+ # TODO.
+
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events,
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 22469486d7..04caf65793 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -247,7 +247,14 @@ class IdentityHandler(BaseHandler):
defer.returnValue(changed)
@defer.inlineCallbacks
- def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
+ def requestEmailToken(
+ self,
+ id_server,
+ email,
+ client_secret,
+ send_attempt,
+ next_link=None,
+ ):
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@@ -259,7 +266,9 @@ class IdentityHandler(BaseHandler):
'client_secret': client_secret,
'send_attempt': send_attempt,
}
- params.update(kwargs)
+
+ if next_link:
+ params.update({'next_link': next_link})
try:
data = yield self.http_client.post_json_get_json(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b2c33a922..0b02469ceb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -166,6 +166,9 @@ class MessageHandler(object):
now = self.clock.time_msec()
events = yield self._event_serializer.serialize_events(
room_state.values(), now,
+ # We don't bother bundling aggregations in when asked for state
+ # events, as clients won't use them.
+ bundle_aggregations=False,
)
defer.returnValue(events)
@@ -601,6 +604,20 @@ class EventCreationHandler(object):
self.validator.validate_new(event)
+ # If this event is an annotation then we check that that the sender
+ # can't annotate the same way twice (e.g. stops users from liking an
+ # event multiple times).
+ relation = event.content.get("m.relates_to", {})
+ if relation.get("rel_type") == RelationTypes.ANNOTATION:
+ relates_to = relation["event_id"]
+ aggregation_key = relation["key"]
+
+ already_exists = yield self.store.has_user_annotated_event(
+ relates_to, event.type, aggregation_key, event.sender,
+ )
+ if already_exists:
+ raise SynapseError(400, "Can't send same reaction twice")
+
logger.debug(
"Created event %s",
event.event_id,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59d53f1050..557fb5f83d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -158,7 +158,13 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
- hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+ hs.get_reactor().addSystemEventTrigger(
+ "before",
+ "shutdown",
+ run_as_background_process,
+ "presence.on_shutdown",
+ self._on_shutdown,
+ )
self.serial_to_user = {}
self._next_serial = 1
@@ -182,17 +188,27 @@ class PresenceHandler(object):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
+ def run_timeout_handler():
+ return run_as_background_process(
+ "handle_presence_timeouts", self._handle_timeouts
+ )
+
self.clock.call_later(
30,
self.clock.looping_call,
- self._handle_timeouts,
+ run_timeout_handler,
5000,
)
+ def run_persister():
+ return run_as_background_process(
+ "persist_presence_changes", self._persist_unpersisted_changes
+ )
+
self.clock.call_later(
60,
self.clock.looping_call,
- self._persist_unpersisted_changes,
+ run_persister,
60 * 1000,
)
@@ -229,6 +245,7 @@ class PresenceHandler(object):
)
if self.unpersisted_users_changes:
+
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
@@ -240,30 +257,18 @@ class PresenceHandler(object):
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
- logger.info(
- "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
- len(self.unpersisted_users_changes)
- )
-
unpersisted = self.unpersisted_users_changes
self.unpersisted_users_changes = set()
if unpersisted:
+ logger.info(
+ "Persisting %d upersisted presence updates", len(unpersisted)
+ )
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in unpersisted
])
- logger.info("Finished _persist_unpersisted_changes")
-
- @defer.inlineCallbacks
- def _update_states_and_catch_exception(self, new_states):
- try:
- res = yield self._update_states(new_states)
- defer.returnValue(res)
- except Exception:
- logger.exception("Error updating presence")
-
@defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +343,41 @@ class PresenceHandler(object):
logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- try:
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = set(self.wheel_timer.fetch(now))
-
- # Check whether the lists of syncing processes from an external
- # process have expired.
- expired_process_ids = [
- process_id for process_id, last_update
- in self.external_process_last_updated_ms.items()
- if now - last_update > EXTERNAL_PROCESS_EXPIRY
- ]
- for process_id in expired_process_ids:
- users_to_check.update(
- self.external_process_last_updated_ms.pop(process_id, ())
- )
- self.external_process_last_update.pop(process_id)
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in users_to_check
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc(len(states))
+ timers_fired_counter.inc(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.is_mine_id,
- syncing_user_ids=self.get_currently_syncing_users(),
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- run_in_background(self._update_states_and_catch_exception, changes)
- except Exception:
- logger.exception("Exception in _handle_timeouts loop")
+ return self._update_states(changes)
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -833,14 +834,17 @@ class PresenceHandler(object):
# joins.
continue
- event = yield self.store.get_event(event_id)
- if event.content.get("membership") != Membership.JOIN:
+ event = yield self.store.get_event(event_id, allow_none=True)
+ if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
if prev_event_id:
- prev_event = yield self.store.get_event(prev_event_id)
- if prev_event.content.get("membership") == Membership.JOIN:
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+ if (
+ prev_event
+ and prev_event.content.get("membership") == Membership.JOIN
+ ):
# Ignore changes to join events.
continue
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 91fc718ff8..a5fc6c5dbf 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,6 +31,9 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+MAX_DISPLAYNAME_LEN = 100
+MAX_AVATAR_URL_LEN = 1000
+
class BaseProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.
@@ -162,6 +165,11 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
+ if len(new_displayname) > MAX_DISPLAYNAME_LEN:
+ raise SynapseError(
+ 400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN, ),
+ )
+
if new_displayname == '':
new_displayname = None
@@ -217,6 +225,11 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
+ if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
+ raise SynapseError(
+ 400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN, ),
+ )
+
yield self.store.set_profile_avatar_url(
target_user.localpart, new_avatar_url
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a51d11a257..9a388ea013 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,7 +19,7 @@ import logging
from twisted.internet import defer
from synapse import types
-from synapse.api.constants import LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, LoginType
from synapse.api.errors import (
AuthError,
Codes,
@@ -123,6 +123,15 @@ class RegistrationHandler(BaseHandler):
self.check_user_id_not_appservice_exclusive(user_id)
+ if len(user_id) > MAX_USERID_LENGTH:
+ raise SynapseError(
+ 400,
+ "User ID may not be longer than %s characters" % (
+ MAX_USERID_LENGTH,
+ ),
+ Codes.INVALID_USERNAME
+ )
+
users = yield self.store.get_users_by_id_case_insensitive(user_id)
if users:
if not guest_access_token:
@@ -522,6 +531,8 @@ class RegistrationHandler(BaseHandler):
A tuple of (user_id, access_token).
Raises:
RegistrationError if there was a problem registering.
+
+ NB this is only used in tests. TODO: move it to the test package!
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e37ae96899..4a17911a87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -70,6 +70,7 @@ class RoomCreationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
+ self.config = hs.config
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -475,7 +476,11 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
- room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
+ room_version = config.get(
+ "room_version",
+ self.config.default_room_version.identifier,
+ )
+
if not isinstance(room_version, string_types):
raise SynapseError(
400,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ffc588d454..93ac986c86 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -944,7 +944,7 @@ class RoomMemberHandler(object):
}
if self.config.invite_3pid_guest:
- guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+ guest_user_id, guest_access_token = yield self.get_or_register_3pid_guest(
requester=requester,
medium=medium,
address=address,
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..7ad16c8566
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,333 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+ """Handles keeping the *_stats tables updated with a simple time-series of
+ information about the users, rooms and media on the server, such that admins
+ have some idea of who is consuming their resources.
+
+ Heavily derived from UserDirectoryHandler
+ """
+
+ def __init__(self, hs):
+ super(StatsHandler, self).__init__(hs)
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+ self.server_name = hs.hostname
+ self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
+ self.stats_bucket_size = hs.config.stats_bucket_size
+
+ # The current position in the current_state_delta stream
+ self.pos = None
+
+ # Guard to ensure we only process deltas one at a time
+ self._is_processing = False
+
+ if hs.config.stats_enabled:
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # We kick this off so that we don't have to wait for a change before
+ # we start populating stats
+ self.clock.call_later(0, self.notify_new_event)
+
+ def notify_new_event(self):
+ """Called when there may be more deltas to process
+ """
+ if not self.hs.config.stats_enabled:
+ return
+
+ if self._is_processing:
+ return
+
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
+ self._is_processing = True
+ run_as_background_process("stats.notify_new_event", process)
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ # If self.pos is None then means we haven't fetched it from DB
+ if self.pos is None:
+ self.pos = yield self.store.get_stats_stream_pos()
+
+ # If still None then the initial background update hasn't happened yet
+ if self.pos is None:
+ defer.returnValue(None)
+
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "stats_delta"):
+ deltas = yield self.store.get_current_state_deltas(self.pos)
+ if not deltas:
+ return
+
+ logger.info("Handling %d state deltas", len(deltas))
+ yield self._handle_deltas(deltas)
+
+ self.pos = deltas[-1]["stream_id"]
+ yield self.store.update_stats_stream_pos(self.pos)
+
+ event_processing_positions.labels("stats").set(self.pos)
+
+ @defer.inlineCallbacks
+ def _handle_deltas(self, deltas):
+ """
+ Called with the state deltas to process
+ """
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ stream_id = delta["stream_id"]
+ prev_event_id = delta["prev_event_id"]
+ stream_pos = delta["stream_id"]
+
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+ token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+ # If the earliest token to begin from is larger than our current
+ # stream ID, skip processing this delta.
+ if token is not None and token >= stream_id:
+ logger.debug(
+ "Ignoring: %s as earlier than this room's initial ingestion event",
+ event_id,
+ )
+ continue
+
+ if event_id is None and prev_event_id is None:
+ # Errr...
+ continue
+
+ event_content = {}
+
+ if event_id is not None:
+ event = yield self.store.get_event(event_id, allow_none=True)
+ if event:
+ event_content = event.content or {}
+
+ # We use stream_pos here rather than fetch by event_id as event_id
+ # may be None
+ now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+
+ # quantise time to the nearest bucket
+ now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+ if typ == EventTypes.Member:
+ # we could use _get_key_change here but it's a bit inefficient
+ # given we're not testing for a specific result; might as well
+ # just grab the prev_membership and membership strings and
+ # compare them.
+ prev_event_content = {}
+ if prev_event_id is not None:
+ prev_event = yield self.store.get_event(
+ prev_event_id, allow_none=True,
+ )
+ if prev_event:
+ prev_event_content = prev_event.content
+
+ membership = event_content.get("membership", Membership.LEAVE)
+ prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+ if prev_membership == membership:
+ continue
+
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "joined_members", -1
+ )
+ elif prev_membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "invited_members", -1
+ )
+ elif prev_membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "left_members", -1
+ )
+ elif prev_membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "banned_members", -1
+ )
+ else:
+ err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+ logger.error(err)
+ raise ValueError(err)
+
+ if membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "joined_members", +1
+ )
+ elif membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "invited_members", +1
+ )
+ elif membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "left_members", +1
+ )
+ elif membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "banned_members", +1
+ )
+ else:
+ err = "%s is not a valid membership" % (repr(membership),)
+ logger.error(err)
+ raise ValueError(err)
+
+ user_id = state_key
+ if self.is_mine_id(user_id):
+ # update user_stats as it's one of our users
+ public = yield self._is_public_room(room_id)
+
+ if membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now,
+ "user",
+ user_id,
+ "public_rooms" if public else "private_rooms",
+ -1,
+ )
+ elif membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now,
+ "user",
+ user_id,
+ "public_rooms" if public else "private_rooms",
+ +1,
+ )
+
+ elif typ == EventTypes.Create:
+ # Newly created room. Add it with all blank portions.
+ yield self.store.update_room_state(
+ room_id,
+ {
+ "join_rules": None,
+ "history_visibility": None,
+ "encryption": None,
+ "name": None,
+ "topic": None,
+ "avatar": None,
+ "canonical_alias": None,
+ },
+ )
+
+ elif typ == EventTypes.JoinRules:
+ yield self.store.update_room_state(
+ room_id, {"join_rules": event_content.get("join_rule")}
+ )
+
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
+
+ elif typ == EventTypes.RoomHistoryVisibility:
+ yield self.store.update_room_state(
+ room_id,
+ {"history_visibility": event_content.get("history_visibility")},
+ )
+
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "history_visibility", "world_readable"
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
+
+ elif typ == EventTypes.Encryption:
+ yield self.store.update_room_state(
+ room_id, {"encryption": event_content.get("algorithm")}
+ )
+ elif typ == EventTypes.Name:
+ yield self.store.update_room_state(
+ room_id, {"name": event_content.get("name")}
+ )
+ elif typ == EventTypes.Topic:
+ yield self.store.update_room_state(
+ room_id, {"topic": event_content.get("topic")}
+ )
+ elif typ == EventTypes.RoomAvatar:
+ yield self.store.update_room_state(
+ room_id, {"avatar": event_content.get("url")}
+ )
+ elif typ == EventTypes.CanonicalAlias:
+ yield self.store.update_room_state(
+ room_id, {"canonical_alias": event_content.get("alias")}
+ )
+
+ @defer.inlineCallbacks
+ def update_public_room_stats(self, ts, room_id, is_public):
+ """
+ Increment/decrement a user's number of public rooms when a room they are
+ in changes to/from public visibility.
+
+ Args:
+ ts (int): Timestamp in seconds
+ room_id (str)
+ is_public (bool)
+ """
+ # For now, blindly iterate over all local users in the room so that
+ # we can handle the whole problem of copying buckets over as needed
+ user_ids = yield self.store.get_users_in_room(room_id)
+
+ for user_id in user_ids:
+ if self.hs.is_mine(UserID.from_string(user_id)):
+ yield self.store.update_stats_delta(
+ ts, "user", user_id, "public_rooms", +1 if is_public else -1
+ )
+ yield self.store.update_stats_delta(
+ ts, "user", user_id, "private_rooms", -1 if is_public else +1
+ )
+
+ @defer.inlineCallbacks
+ def _is_public_room(self, room_id):
+ join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+ history_visibility = yield self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility
+ )
+
+ if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+ (
+ history_visibility
+ and history_visibility.content.get("history_visibility")
+ == "world_readable"
+ )
+ ):
+ defer.returnValue(True)
+ else:
+ defer.returnValue(False)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1ee9a6e313..62fda0c664 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -583,30 +583,42 @@ class SyncHandler(object):
)
# if the room has a name or canonical_alias set, we can skip
- # calculating heroes. we assume that if the event has contents, it'll
- # be a valid name or canonical_alias - i.e. we're checking that they
- # haven't been "deleted" by blatting {} over the top.
+ # calculating heroes. Empty strings are falsey, so we check
+ # for the "name" value and default to an empty string.
if name_id:
name = yield self.store.get_event(name_id, allow_none=True)
- if name and name.content:
+ if name and name.content.get("name"):
defer.returnValue(summary)
if canonical_alias_id:
canonical_alias = yield self.store.get_event(
canonical_alias_id, allow_none=True,
)
- if canonical_alias and canonical_alias.content:
+ if canonical_alias and canonical_alias.content.get("alias"):
defer.returnValue(summary)
+ me = sync_config.user.to_string()
+
joined_user_ids = [
- r[0] for r in details.get(Membership.JOIN, empty_ms).members
+ r[0]
+ for r in details.get(Membership.JOIN, empty_ms).members
+ if r[0] != me
]
invited_user_ids = [
- r[0] for r in details.get(Membership.INVITE, empty_ms).members
+ r[0]
+ for r in details.get(Membership.INVITE, empty_ms).members
+ if r[0] != me
]
gone_user_ids = (
- [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
- [r[0] for r in details.get(Membership.BAN, empty_ms).members]
+ [
+ r[0]
+ for r in details.get(Membership.LEAVE, empty_ms).members
+ if r[0] != me
+ ] + [
+ r[0]
+ for r in details.get(Membership.BAN, empty_ms).members
+ if r[0] != me
+ ]
)
# FIXME: only build up a member_ids list for our heroes
@@ -621,22 +633,13 @@ class SyncHandler(object):
member_ids[user_id] = event_id
# FIXME: order by stream ordering rather than as returned by SQL
- me = sync_config.user.to_string()
if (joined_user_ids or invited_user_ids):
summary['m.heroes'] = sorted(
- [
- user_id
- for user_id in (joined_user_ids + invited_user_ids)
- if user_id != me
- ]
+ [user_id for user_id in (joined_user_ids + invited_user_ids)]
)[0:5]
else:
summary['m.heroes'] = sorted(
- [
- user_id
- for user_id in gone_user_ids
- if user_id != me
- ]
+ [user_id for user_id in gone_user_ids]
)[0:5]
if not sync_config.filter_collection.lazy_load_members():
|