diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 402e44cdef..184eef09d0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -15,6 +15,7 @@
# limitations under the License.
import logging
+import unicodedata
import attr
import bcrypt
@@ -519,6 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
+ yield self._check_mau_limits()
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -626,6 +628,7 @@ class AuthHandler(BaseHandler):
# special case to check for "password" for the check_password interface
# for the auth providers
password = login_submission.get("password")
+
if login_type == LoginType.PASSWORD:
if not self._password_enabled:
raise SynapseError(400, "Password login has been disabled.")
@@ -707,9 +710,10 @@ class AuthHandler(BaseHandler):
multiple inexact matches.
Args:
- user_id (str): complete @user:id
+ user_id (unicode): complete @user:id
+ password (unicode): the provided password
Returns:
- (str) the canonical_user_id, or None if unknown user / bad password
+ (unicode) the canonical_user_id, or None if unknown user / bad password
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -728,15 +732,18 @@ class AuthHandler(BaseHandler):
device_id)
defer.returnValue(access_token)
+ @defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
+ yield self._check_mau_limits()
auth_api = self.hs.get_auth()
+ user_id = None
try:
macaroon = pymacaroons.Macaroon.deserialize(login_token)
user_id = auth_api.get_user_id_from_macaroon(macaroon)
auth_api.validate_macaroon(macaroon, "login", True, user_id)
- return user_id
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ defer.returnValue(user_id)
@defer.inlineCallbacks
def delete_access_token(self, access_token):
@@ -849,14 +856,19 @@ class AuthHandler(BaseHandler):
"""Computes a secure hash of password.
Args:
- password (str): Password to hash.
+ password (unicode): Password to hash.
Returns:
- Deferred(str): Hashed password.
+ Deferred(unicode): Hashed password.
"""
def _do_hash():
- return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
- bcrypt.gensalt(self.bcrypt_rounds))
+ # Normalise the Unicode in the password
+ pw = unicodedata.normalize("NFKC", password)
+
+ return bcrypt.hashpw(
+ pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
+ bcrypt.gensalt(self.bcrypt_rounds),
+ ).decode('ascii')
return make_deferred_yieldable(
threads.deferToThreadPool(
@@ -868,16 +880,19 @@ class AuthHandler(BaseHandler):
"""Validates that self.hash(password) == stored_hash.
Args:
- password (str): Password to hash.
- stored_hash (str): Expected hash value.
+ password (unicode): Password to hash.
+ stored_hash (unicode): Expected hash value.
Returns:
Deferred(bool): Whether self.hash(password) == stored_hash.
"""
def _do_validate_hash():
+ # Normalise the Unicode in the password
+ pw = unicodedata.normalize("NFKC", password)
+
return bcrypt.checkpw(
- password.encode('utf8') + self.hs.config.password_pepper,
+ pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
stored_hash.encode('utf8')
)
@@ -892,6 +907,19 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
+ @defer.inlineCallbacks
+ def _check_mau_limits(self):
+ """
+ Ensure that if mau blocking is enabled that invalid users cannot
+ log in.
+ """
+ if self.hs.config.limit_usage_by_mau is True:
+ current_mau = yield self.store.count_monthly_users()
+ if current_mau >= self.hs.config.max_mau_value:
+ raise AuthError(
+ 403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
+ )
+
@attr.s
class MacaroonGenerator(object):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c0dd468940..533b82c783 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -76,7 +76,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_federation_client()
+ self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -255,7 +255,7 @@ class FederationHandler(BaseHandler):
# know about
for p in prevs - seen:
state, got_auth_chain = (
- yield self.replication_layer.get_state_for_room(
+ yield self.federation_client.get_state_for_room(
origin, pdu.room_id, p
)
)
@@ -338,7 +338,7 @@ class FederationHandler(BaseHandler):
#
# see https://github.com/matrix-org/synapse/pull/1744
- missing_events = yield self.replication_layer.get_missing_events(
+ missing_events = yield self.federation_client.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
@@ -400,7 +400,7 @@ class FederationHandler(BaseHandler):
)
try:
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
except AuthError as e:
@@ -444,7 +444,7 @@ class FederationHandler(BaseHandler):
yield self._handle_new_events(origin, event_infos)
try:
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin,
event,
state=state,
@@ -469,17 +469,6 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
@@ -501,7 +490,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
+ yield self.user_joined_room(user, event.room_id)
@log_function
@defer.inlineCallbacks
@@ -522,7 +511,7 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
- events = yield self.replication_layer.backfill(
+ events = yield self.federation_client.backfill(
dest,
room_id,
limit=limit,
@@ -570,7 +559,7 @@ class FederationHandler(BaseHandler):
state_events = {}
events_to_state = {}
for e_id in edges:
- state, auth = yield self.replication_layer.get_state_for_room(
+ state, auth = yield self.federation_client.get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id
@@ -612,7 +601,7 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
- self.replication_layer.get_pdu,
+ self.federation_client.get_pdu,
[dest],
event_id,
outlier=True,
@@ -893,7 +882,7 @@ class FederationHandler(BaseHandler):
Invites must be signed by the invitee's server before distribution.
"""
- pdu = yield self.replication_layer.send_invite(
+ pdu = yield self.federation_client.send_invite(
destination=target_host,
room_id=event.room_id,
event_id=event.event_id,
@@ -942,7 +931,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = []
- yield self.store.clean_room_for_join(room_id)
+ yield self._clean_room_for_join(room_id)
handled_events = set()
@@ -955,7 +944,7 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
- ret = yield self.replication_layer.send_join(target_hosts, event)
+ ret = yield self.federation_client.send_join(target_hosts, event)
origin = ret["origin"]
state = ret["state"]
@@ -981,15 +970,10 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
-
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -1084,7 +1068,7 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin, event
)
@@ -1094,20 +1078,10 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
+ yield self.user_joined_room(user, event.room_id)
prev_state_ids = yield context.get_prev_state_ids(self.store)
@@ -1176,17 +1150,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self._persist_events([(event, context)])
defer.returnValue(event)
@@ -1211,30 +1175,20 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- yield self.replication_layer.send_leave(
+ yield self.federation_client.send_leave(
target_hosts,
event
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self._persist_events([(event, context)])
defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={},):
- origin, pdu = yield self.replication_layer.make_membership_event(
+ origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
@@ -1279,7 +1233,7 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_leave_request(self, room_id, user_id):
""" We've received a /make_leave/ request, so we create a partial
- join event for the room and return that. We do *not* persist or
+ leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
builder = self.event_builder_factory.new({
@@ -1318,7 +1272,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ yield self._handle_new_event(
origin, event
)
@@ -1328,16 +1282,6 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
defer.returnValue(None)
@defer.inlineCallbacks
@@ -1479,9 +1423,8 @@ class FederationHandler(BaseHandler):
event, context
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
+ yield self._persist_events(
+ [(event, context)],
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
@@ -1494,15 +1437,7 @@ class FederationHandler(BaseHandler):
six.reraise(tp, value, tb)
- if not backfilled:
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- logcontext.run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id,
- )
-
- defer.returnValue((context, event_stream_id, max_stream_id))
+ defer.returnValue(context)
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
@@ -1510,6 +1445,8 @@ class FederationHandler(BaseHandler):
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
+
+ Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@@ -1524,7 +1461,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self.store.persist_events(
+ yield self._persist_events(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1536,7 +1473,8 @@ class FederationHandler(BaseHandler):
def _persist_auth_tree(self, origin, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
- Persists the event seperately.
+ Persists the event separately. Notifies about the persisted events
+ where appropriate.
Will attempt to fetch missing auth events.
@@ -1547,8 +1485,7 @@ class FederationHandler(BaseHandler):
event (Event)
Returns:
- 2-tuple of (event_stream_id, max_stream_id) from the persist_event
- call for `event`
+ Deferred
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
@@ -1574,7 +1511,7 @@ class FederationHandler(BaseHandler):
missing_auth_events.add(e_id)
for e_id in missing_auth_events:
- m_ev = yield self.replication_layer.get_pdu(
+ m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
outlier=True,
@@ -1612,7 +1549,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self.store.persist_events(
+ yield self._persist_events(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1623,12 +1560,10 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event, new_event_context,
+ yield self._persist_events(
+ [(event, new_event_context)],
)
- defer.returnValue((event_stream_id, max_stream_id))
-
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
"""
@@ -1794,7 +1729,7 @@ class FederationHandler(BaseHandler):
logger.info("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
try:
- remote_auth_chain = yield self.replication_layer.get_event_auth(
+ remote_auth_chain = yield self.federation_client.get_event_auth(
origin, event.room_id, event.event_id
)
@@ -1910,7 +1845,7 @@ class FederationHandler(BaseHandler):
try:
# 2. Get remote difference.
- result = yield self.replication_layer.query_auth(
+ result = yield self.federation_client.query_auth(
origin,
event.room_id,
event.event_id,
@@ -2209,7 +2144,7 @@ class FederationHandler(BaseHandler):
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
- yield self.replication_layer.forward_third_party_invite(
+ yield self.federation_client.forward_third_party_invite(
destinations,
room_id,
event_dict,
@@ -2364,3 +2299,69 @@ class FederationHandler(BaseHandler):
)
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")
+
+ @defer.inlineCallbacks
+ def _persist_events(self, event_and_contexts, backfilled=False):
+ """Persists events and tells the notifier/pushers about them, if
+ necessary.
+
+ Args:
+ event_and_contexts(list[tuple[FrozenEvent, EventContext]])
+ backfilled (bool): Whether these events are a result of
+ backfilling or not
+
+ Returns:
+ Deferred
+ """
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
+
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
+
+ def _notify_persisted_event(self, event, max_stream_id):
+ """Checks to see if notifier/pushers should be notified about the
+ event or not.
+
+ Args:
+ event (FrozenEvent)
+ max_stream_id (int): The max_stream_id returned by persist_events
+ """
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+
+ # We notify for memberships if its an invite for one of our
+ # users
+ if event.internal_metadata.is_outlier():
+ if event.membership != Membership.INVITE:
+ if not self.is_mine_id(target_user_id):
+ return
+
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+ elif event.internal_metadata.is_outlier():
+ return
+
+ event_stream_id = event.internal_metadata.stream_ordering
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ logcontext.run_in_background(
+ self.pusher_pool.on_new_notifications,
+ event_stream_id, max_stream_id,
+ )
+
+ def _clean_room_for_join(self, room_id):
+ return self.store.clean_room_for_join(room_id)
+
+ def user_joined_room(self, user, room_id):
+ """Called when a new user has joined the room
+ """
+ return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 8c8aedb2b8..1d36d967c3 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.errors import (
CodeMessageException,
Codes,
- MatrixCodeMessageException,
+ HttpResponseException,
SynapseError,
)
@@ -85,7 +85,6 @@ class IdentityHandler(BaseHandler):
)
defer.returnValue(None)
- data = {}
try:
data = yield self.http_client.get_json(
"https://%s%s" % (
@@ -94,11 +93,9 @@ class IdentityHandler(BaseHandler):
),
{'sid': creds['sid'], 'client_secret': client_secret}
)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
logger.info("getValidated3pid failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
- data = json.loads(e.msg)
+ raise e.to_synapse_error()
if 'medium' in data:
defer.returnValue(data)
@@ -136,7 +133,7 @@ class IdentityHandler(BaseHandler):
)
logger.debug("bound threepid %r to %s", creds, mxid)
except CodeMessageException as e:
- data = json.loads(e.msg)
+ data = json.loads(e.msg) # XXX WAT?
defer.returnValue(data)
@defer.inlineCallbacks
@@ -209,12 +206,9 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
- except MatrixCodeMessageException as e:
- logger.info("Proxied requestToken failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
+ except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
- raise e
+ raise e.to_synapse_error()
@defer.inlineCallbacks
def requestMsisdnToken(
@@ -244,9 +238,6 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
- except MatrixCodeMessageException as e:
- logger.info("Proxied requestToken failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
+ except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
- raise e
+ raise e.to_synapse_error()
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 859f6d2b2e..9af2e8f869 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,7 +17,14 @@ import logging
from twisted.internet import defer
-from synapse.api.errors import AuthError, CodeMessageException, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ CodeMessageException,
+ Codes,
+ StoreError,
+ SynapseError,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
from ._base import BaseHandler
@@ -41,19 +48,24 @@ class ProfileHandler(BaseHandler):
if hs.config.worker_app is None:
self.clock.looping_call(
- self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
- displayname = yield self.store.get_profile_displayname(
- target_user.localpart
- )
- avatar_url = yield self.store.get_profile_avatar_url(
- target_user.localpart
- )
+ try:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue({
"displayname": displayname,
@@ -73,7 +85,6 @@ class ProfileHandler(BaseHandler):
except CodeMessageException as e:
if e.code != 404:
logger.exception("Failed to get displayname")
-
raise
@defer.inlineCallbacks
@@ -84,12 +95,17 @@ class ProfileHandler(BaseHandler):
"""
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
- displayname = yield self.store.get_profile_displayname(
- target_user.localpart
- )
- avatar_url = yield self.store.get_profile_avatar_url(
- target_user.localpart
- )
+ try:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue({
"displayname": displayname,
@@ -102,9 +118,14 @@ class ProfileHandler(BaseHandler):
@defer.inlineCallbacks
def get_displayname(self, target_user):
if self.hs.is_mine(target_user):
- displayname = yield self.store.get_profile_displayname(
- target_user.localpart
- )
+ try:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue(displayname)
else:
@@ -121,7 +142,6 @@ class ProfileHandler(BaseHandler):
except CodeMessageException as e:
if e.code != 404:
logger.exception("Failed to get displayname")
-
raise
except Exception:
logger.exception("Failed to get displayname")
@@ -156,10 +176,14 @@ class ProfileHandler(BaseHandler):
@defer.inlineCallbacks
def get_avatar_url(self, target_user):
if self.hs.is_mine(target_user):
- avatar_url = yield self.store.get_profile_avatar_url(
- target_user.localpart
- )
-
+ try:
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue(avatar_url)
else:
try:
@@ -212,16 +236,20 @@ class ProfileHandler(BaseHandler):
just_field = args.get("field", None)
response = {}
+ try:
+ if just_field is None or just_field == "displayname":
+ response["displayname"] = yield self.store.get_profile_displayname(
+ user.localpart
+ )
- if just_field is None or just_field == "displayname":
- response["displayname"] = yield self.store.get_profile_displayname(
- user.localpart
- )
-
- if just_field is None or just_field == "avatar_url":
- response["avatar_url"] = yield self.store.get_profile_avatar_url(
- user.localpart
- )
+ if just_field is None or just_field == "avatar_url":
+ response["avatar_url"] = yield self.store.get_profile_avatar_url(
+ user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue(response)
@@ -254,6 +282,12 @@ class ProfileHandler(BaseHandler):
room_id, str(e.message)
)
+ def _start_update_remote_profile_cache(self):
+ return run_as_background_process(
+ "Update remote profile", self._update_remote_profile_cache,
+ )
+
+ @defer.inlineCallbacks
def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't
checked in a while.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7caff0cbc8..289704b241 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -45,7 +45,7 @@ class RegistrationHandler(BaseHandler):
hs (synapse.server.HomeServer):
"""
super(RegistrationHandler, self).__init__(hs)
-
+ self.hs = hs
self.auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
@@ -131,7 +131,7 @@ class RegistrationHandler(BaseHandler):
Args:
localpart : The local part of the user ID to register. If None,
one will be generated.
- password (str) : The password to assign to this user so they can
+ password (unicode) : The password to assign to this user so they can
login again. This can be None which means they cannot login again
via a password (e.g. the user is an application service user).
generate_token (bool): Whether a new access token should be
@@ -144,6 +144,7 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
+ yield self._check_mau_limits()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -288,6 +289,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
+ yield self._check_mau_limits()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -437,7 +439,7 @@ class RegistrationHandler(BaseHandler):
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
-
+ yield self._check_mau_limits()
need_register = True
try:
@@ -531,3 +533,16 @@ class RegistrationHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
action="join",
)
+
+ @defer.inlineCallbacks
+ def _check_mau_limits(self):
+ """
+ Do not accept registrations if monthly active user limits exceeded
+ and limiting is enabled
+ """
+ if self.hs.config.limit_usage_by_mau is True:
+ current_mau = yield self.store.count_monthly_users()
+ if current_mau >= self.hs.config.max_mau_value:
+ raise RegistrationError(
+ 403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED
+ )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 003b848c00..7b7804d9b2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,6 +15,7 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
+import itertools
import logging
import math
import string
@@ -401,7 +402,7 @@ class RoomContextHandler(object):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit):
+ def get_event_context(self, user, room_id, event_id, limit, event_filter):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -411,6 +412,8 @@ class RoomContextHandler(object):
event_id (str)
limit (int): The maximum number of events to return in total
(excluding state).
+ event_filter (Filter|None): the filter to apply to the events returned
+ (excluding the target event_id)
Returns:
dict, or None if the event isn't found
@@ -443,7 +446,7 @@ class RoomContextHandler(object):
)
results = yield self.store.get_events_around(
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter
)
results["events_before"] = yield filter_evts(results["events_before"])
@@ -455,8 +458,23 @@ class RoomContextHandler(object):
else:
last_event_id = event_id
+ types = None
+ filtered_types = None
+ if event_filter and event_filter.lazy_load_members():
+ members = set(ev.sender for ev in itertools.chain(
+ results["events_before"],
+ (results["event"],),
+ results["events_after"],
+ ))
+ filtered_types = [EventTypes.Member]
+ types = [(EventTypes.Member, member) for member in members]
+
+ # XXX: why do we return the state as of the last event rather than the
+ # first? Shouldn't we be consistent with /sync?
+ # https://github.com/matrix-org/matrix-doc/issues/687
+
state = yield self.store.get_state_for_events(
- [last_event_id], None
+ [last_event_id], types, filtered_types=filtered_types,
)
results["state"] = list(state[last_event_id].values())
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6184737cd4..e13551d569 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -728,6 +728,10 @@ class RoomMemberHandler(object):
inviter_display_name = member_event.content.get("displayname", "")
inviter_avatar_url = member_event.content.get("avatar_url", "")
+ # if user has no display name, default to their MXID
+ if not inviter_display_name:
+ inviter_display_name = user.to_string()
+
canonical_room_alias = ""
canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
if canonical_alias_event:
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 69ae9731d5..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -287,7 +287,7 @@ class SearchHandler(BaseHandler):
contexts = {}
for event in allowed_events:
res = yield self.store.get_events_around(
- event.room_id, event.event_id, before_limit, after_limit
+ event.room_id, event.event_id, before_limit, after_limit,
)
logger.info(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a006c952f2..ea148866e7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
@@ -34,6 +37,15 @@ logger = logging.getLogger(__name__)
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
+
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"filter_collection",
@@ -184,6 +196,12 @@ class SyncHandler(object):
)
self.state = hs.get_state_handler()
+ # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ self.lazy_loaded_members_cache = ExpiringCache(
+ "lazy_loaded_members_cache", self.clock,
+ max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ )
+
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -419,29 +437,44 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event):
+ def get_state_after_event(self, event, types=None, filtered_types=None):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(
+ event.event_id, types, filtered_types=filtered_types,
+ )
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position):
+ def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
@@ -456,7 +489,9 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
- state = yield self.get_state_after_event(last_event)
+ state = yield self.get_state_after_event(
+ last_event, types, filtered_types=filtered_types,
+ )
else:
# no events in this room - so presumably no state
@@ -488,59 +523,129 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+
+ types = None
+ filtered_types = None
+
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+ include_redundant_members = (
+ sync_config.filter_collection.include_redundant_members()
+ )
+
+ if lazy_load_members:
+ # We only request state for the members needed to display the
+ # timeline:
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in batch.events
+ )
+ ]
+
+ # only apply the filtering to room members
+ filtered_types = [EventTypes.Member]
+
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events if event.is_state()
+ }
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
+
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token
+ room_id, stream_position=now_token, types=types,
+ filtered_types=filtered_types,
)
state_ids = current_state_ids
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
previous={},
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ room_id, stream_position=since_token, types=types,
+ filtered_types=filtered_types,
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
+ if lazy_load_members:
+ if types:
+ state_ids = yield self.store.get_state_ids_for_event(
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
+ )
+
+ if lazy_load_members and not include_redundant_members:
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+
+ # if it's a new sync sequence, then assume the client has had
+ # amnesia and doesn't want any recent lazy-loaded members
+ # de-duplicated.
+ if since_token is None:
+ logger.debug("clearing LruCache for %r", cache_key)
+ cache.clear()
+ else:
+ # only send members which aren't in our LruCache (either
+ # because they're new to this client or have been pushed out
+ # of the cache)
+ logger.debug("filtering state from %r...", state_ids)
+ state_ids = {
+ t: event_id
+ for t, event_id in state_ids.iteritems()
+ if cache.get(t[1]) != event_id
+ }
+ logger.debug("...to %r", state_ids)
+
+ # add any member IDs we are about to send into our LruCache
+ for t, event_id in itertools.chain(
+ state_ids.items(),
+ timeline_state.items(),
+ ):
+ if t[0] == EventTypes.Member:
+ cache.set(t[1], event_id)
state = {}
if state_ids:
@@ -1451,7 +1556,9 @@ def _action_has_highlight(actions):
return False
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+ timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
"""Works out what state to include in a sync response.
Args:
@@ -1460,6 +1567,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
previous (dict): state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
+ lazy_load_members (bool): whether to return members from timeline_start
+ or not. assumes that timeline_start has already been filtered to
+ include only the members the client needs to know about.
Returns:
dict
@@ -1475,9 +1585,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
}
c_ids = set(e for e in current.values())
- tc_ids = set(e for e in timeline_contains.values())
- p_ids = set(e for e in previous.values())
ts_ids = set(e for e in timeline_start.values())
+ p_ids = set(e for e in previous.values())
+ tc_ids = set(e for e in timeline_contains.values())
+
+ # If we are lazyloading room members, we explicitly add the membership events
+ # for the senders in the timeline into the state block returned by /sync,
+ # as we may not have sent them to the client before. We find these membership
+ # events by filtering them out of timeline_start, which has already been filtered
+ # to only include membership events for the senders in the timeline.
+ # In practice, we can do this by removing them from the p_ids list,
+ # which is the list of relevant state we know we have already sent to the client.
+ # see https://github.com/matrix-org/synapse/pull/2970
+ # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+ if lazy_load_members:
+ p_ids.difference_update(
+ e for t, e in timeline_start.iteritems()
+ if t[0] == EventTypes.Member
+ )
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
|