diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
new file mode 100644
index 0000000000..261446517d
--- /dev/null
+++ b/synapse/handlers/account_validity.py
@@ -0,0 +1,253 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import email.mime.multipart
+import email.utils
+import logging
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+from synapse.types import UserID
+from synapse.util import stringutils
+from synapse.util.logcontext import make_deferred_yieldable
+
+try:
+ from synapse.push.mailer import load_jinja2_templates
+except ImportError:
+ load_jinja2_templates = None
+
+logger = logging.getLogger(__name__)
+
+
+class AccountValidityHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.sendmail = self.hs.get_sendmail()
+ self.clock = self.hs.get_clock()
+
+ self._account_validity = self.hs.config.account_validity
+
+ if self._account_validity.renew_by_email_enabled and load_jinja2_templates:
+ # Don't do email-specific configuration if renewal by email is disabled.
+ try:
+ app_name = self.hs.config.email_app_name
+
+ self._subject = self._account_validity.renew_email_subject % {
+ "app": app_name,
+ }
+
+ self._from_string = self.hs.config.email_notif_from % {
+ "app": app_name,
+ }
+ except Exception:
+ # If substitution failed, fall back to the bare strings.
+ self._subject = self._account_validity.renew_email_subject
+ self._from_string = self.hs.config.email_notif_from
+
+ self._raw_from = email.utils.parseaddr(self._from_string)[1]
+
+ self._template_html, self._template_text = load_jinja2_templates(
+ config=self.hs.config,
+ template_html_name=self.hs.config.email_expiry_template_html,
+ template_text_name=self.hs.config.email_expiry_template_text,
+ )
+
+ # Check the renewal emails to send and send them every 30min.
+ self.clock.looping_call(
+ self.send_renewal_emails,
+ 30 * 60 * 1000,
+ )
+
+ @defer.inlineCallbacks
+ def send_renewal_emails(self):
+ """Gets the list of users whose account is expiring in the amount of time
+ configured in the ``renew_at`` parameter from the ``account_validity``
+ configuration, and sends renewal emails to all of these users as long as they
+ have an email 3PID attached to their account.
+ """
+ expiring_users = yield self.store.get_users_expiring_soon()
+
+ if expiring_users:
+ for user in expiring_users:
+ yield self._send_renewal_email(
+ user_id=user["user_id"],
+ expiration_ts=user["expiration_ts_ms"],
+ )
+
+ @defer.inlineCallbacks
+ def send_renewal_email_to_user(self, user_id):
+ expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
+ yield self._send_renewal_email(user_id, expiration_ts)
+
+ @defer.inlineCallbacks
+ def _send_renewal_email(self, user_id, expiration_ts):
+ """Sends out a renewal email to every email address attached to the given user
+ with a unique link allowing them to renew their account.
+
+ Args:
+ user_id (str): ID of the user to send email(s) to.
+ expiration_ts (int): Timestamp in milliseconds for the expiration date of
+ this user's account (used in the email templates).
+ """
+ addresses = yield self._get_email_addresses_for_user(user_id)
+
+ # Stop right here if the user doesn't have at least one email address.
+ # In this case, they will have to ask their server admin to renew their
+ # account manually.
+ if not addresses:
+ return
+
+ try:
+ user_display_name = yield self.store.get_profile_displayname(
+ UserID.from_string(user_id).localpart
+ )
+ if user_display_name is None:
+ user_display_name = user_id
+ except StoreError:
+ user_display_name = user_id
+
+ renewal_token = yield self._get_renewal_token(user_id)
+ url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % (
+ self.hs.config.public_baseurl,
+ renewal_token,
+ )
+
+ template_vars = {
+ "display_name": user_display_name,
+ "expiration_ts": expiration_ts,
+ "url": url,
+ }
+
+ html_text = self._template_html.render(**template_vars)
+ html_part = MIMEText(html_text, "html", "utf8")
+
+ plain_text = self._template_text.render(**template_vars)
+ text_part = MIMEText(plain_text, "plain", "utf8")
+
+ for address in addresses:
+ raw_to = email.utils.parseaddr(address)[1]
+
+ multipart_msg = MIMEMultipart('alternative')
+ multipart_msg['Subject'] = self._subject
+ multipart_msg['From'] = self._from_string
+ multipart_msg['To'] = address
+ multipart_msg['Date'] = email.utils.formatdate()
+ multipart_msg['Message-ID'] = email.utils.make_msgid()
+ multipart_msg.attach(text_part)
+ multipart_msg.attach(html_part)
+
+ logger.info("Sending renewal email to %s", address)
+
+ yield make_deferred_yieldable(self.sendmail(
+ self.hs.config.email_smtp_host,
+ self._raw_from, raw_to, multipart_msg.as_string().encode('utf8'),
+ reactor=self.hs.get_reactor(),
+ port=self.hs.config.email_smtp_port,
+ requireAuthentication=self.hs.config.email_smtp_user is not None,
+ username=self.hs.config.email_smtp_user,
+ password=self.hs.config.email_smtp_pass,
+ requireTransportSecurity=self.hs.config.require_transport_security
+ ))
+
+ yield self.store.set_renewal_mail_status(
+ user_id=user_id,
+ email_sent=True,
+ )
+
+ @defer.inlineCallbacks
+ def _get_email_addresses_for_user(self, user_id):
+ """Retrieve the list of email addresses attached to a user's account.
+
+ Args:
+ user_id (str): ID of the user to lookup email addresses for.
+
+ Returns:
+ defer.Deferred[list[str]]: Email addresses for this account.
+ """
+ threepids = yield self.store.user_get_threepids(user_id)
+
+ addresses = []
+ for threepid in threepids:
+ if threepid["medium"] == "email":
+ addresses.append(threepid["address"])
+
+ defer.returnValue(addresses)
+
+ @defer.inlineCallbacks
+ def _get_renewal_token(self, user_id):
+ """Generates a 32-byte long random string that will be inserted into the
+ user's renewal email's unique link, then saves it into the database.
+
+ Args:
+ user_id (str): ID of the user to generate a string for.
+
+ Returns:
+ defer.Deferred[str]: The generated string.
+
+ Raises:
+ StoreError(500): Couldn't generate a unique string after 5 attempts.
+ """
+ attempts = 0
+ while attempts < 5:
+ try:
+ renewal_token = stringutils.random_string(32)
+ yield self.store.set_renewal_token_for_user(user_id, renewal_token)
+ defer.returnValue(renewal_token)
+ except StoreError:
+ attempts += 1
+ raise StoreError(500, "Couldn't generate a unique string as refresh string.")
+
+ @defer.inlineCallbacks
+ def renew_account(self, renewal_token):
+ """Renews the account attached to a given renewal token by pushing back the
+ expiration date by the current validity period in the server's configuration.
+
+ Args:
+ renewal_token (str): Token sent with the renewal request.
+ """
+ user_id = yield self.store.get_user_from_renewal_token(renewal_token)
+ logger.debug("Renewing an account for user %s", user_id)
+ yield self.renew_account_for_user(user_id)
+
+ @defer.inlineCallbacks
+ def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False):
+ """Renews the account attached to a given user by pushing back the
+ expiration date by the current validity period in the server's
+ configuration.
+
+ Args:
+ renewal_token (str): Token sent with the renewal request.
+ expiration_ts (int): New expiration date. Defaults to now + validity period.
+ email_sent (bool): Whether an email has been sent for this validity period.
+ Defaults to False.
+
+ Returns:
+ defer.Deferred[int]: New expiration date for this account, as a timestamp
+ in milliseconds since epoch.
+ """
+ if expiration_ts is None:
+ expiration_ts = self.clock.time_msec() + self._account_validity.period
+
+ yield self.store.set_account_validity_for_user(
+ user_id=user_id,
+ expiration_ts=expiration_ts,
+ email_sent=email_sent,
+ )
+
+ defer.returnValue(expiration_ts)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4544de821d..aa5d89a9ac 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -912,7 +912,7 @@ class AuthHandler(BaseHandler):
)
@defer.inlineCallbacks
- def delete_threepid(self, user_id, medium, address):
+ def delete_threepid(self, user_id, medium, address, id_server=None):
"""Attempts to unbind the 3pid on the identity servers and deletes it
from the local database.
@@ -920,6 +920,10 @@ class AuthHandler(BaseHandler):
user_id (str)
medium (str)
address (str)
+ id_server (str|None): Use the given identity server when unbinding
+ any threepids. If None then will attempt to unbind using the
+ identity server specified when binding (if known).
+
Returns:
Deferred[bool]: Returns True if successfully unbound the 3pid on
@@ -937,6 +941,7 @@ class AuthHandler(BaseHandler):
{
'medium': medium,
'address': address,
+ 'id_server': id_server,
},
)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 97d3f31d98..6a91f7698e 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -43,12 +43,15 @@ class DeactivateAccountHandler(BaseHandler):
hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
- def deactivate_account(self, user_id, erase_data):
+ def deactivate_account(self, user_id, erase_data, id_server=None):
"""Deactivate a user's account
Args:
user_id (str): ID of user to be deactivated
erase_data (bool): whether to GDPR-erase the user's data
+ id_server (str|None): Use the given identity server when unbinding
+ any threepids. If None then will attempt to unbind using the
+ identity server specified when binding (if known).
Returns:
Deferred[bool]: True if identity server supports removing
@@ -74,6 +77,7 @@ class DeactivateAccountHandler(BaseHandler):
{
'medium': threepid['medium'],
'address': threepid['address'],
+ 'id_server': id_server,
},
)
identity_server_supports_unbinding &= result
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index fe128d9c88..50c587aa61 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
class DirectoryHandler(BaseHandler):
+ MAX_ALIAS_LENGTH = 255
def __init__(self, hs):
super(DirectoryHandler, self).__init__(hs)
@@ -43,8 +44,10 @@ class DirectoryHandler(BaseHandler):
self.state = hs.get_state_handler()
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
+ self.store = hs.get_datastore()
self.config = hs.config
self.enable_room_list_search = hs.config.enable_room_list_search
+ self.require_membership = hs.config.require_membership_for_aliases
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -68,7 +71,7 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
- users = yield self.state.get_current_user_in_room(room_id)
+ users = yield self.state.get_current_users_in_room(room_id)
servers = set(get_domain_from_id(u) for u in users)
if not servers:
@@ -83,7 +86,7 @@ class DirectoryHandler(BaseHandler):
@defer.inlineCallbacks
def create_association(self, requester, room_alias, room_id, servers=None,
- send_event=True):
+ send_event=True, check_membership=True):
"""Attempt to create a new alias
Args:
@@ -93,6 +96,8 @@ class DirectoryHandler(BaseHandler):
servers (list[str]|None): List of servers that others servers
should try and join via
send_event (bool): Whether to send an updated m.room.aliases event
+ check_membership (bool): Whether to check if the user is in the room
+ before the alias can be set (if the server's config requires it).
Returns:
Deferred
@@ -100,6 +105,13 @@ class DirectoryHandler(BaseHandler):
user_id = requester.user.to_string()
+ if len(room_alias.to_string()) > self.MAX_ALIAS_LENGTH:
+ raise SynapseError(
+ 400,
+ "Can't create aliases longer than %s characters" % self.MAX_ALIAS_LENGTH,
+ Codes.INVALID_PARAM,
+ )
+
service = requester.app_service
if service:
if not service.is_interested_in_alias(room_alias.to_string()):
@@ -108,6 +120,14 @@ class DirectoryHandler(BaseHandler):
" this kind of alias.", errcode=Codes.EXCLUSIVE
)
else:
+ if self.require_membership and check_membership:
+ rooms_for_user = yield self.store.get_rooms_for_user(user_id)
+ if room_id not in rooms_for_user:
+ raise AuthError(
+ 403,
+ "You must be in the room to create an alias for it",
+ )
+
if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
raise AuthError(
403, "This user is not permitted to create this alias",
@@ -268,7 +288,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND
)
- users = yield self.state.get_current_user_in_room(room_id)
+ users = yield self.state.get_current_users_in_room(room_id)
extra_servers = set(get_domain_from_id(u) for u in users)
servers = set(extra_servers) | set(servers)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d883e98381..1b4d8c74ae 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -102,7 +102,7 @@ class EventStreamHandler(BaseHandler):
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
- users = yield self.state.get_current_user_in_room(event.room_id)
+ users = yield self.state.get_current_users_in_room(event.room_id)
states = yield presence_handler.get_states(
users,
as_event=True,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9eaf2d3e18..0684778882 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
-from synapse.api.constants import (
- KNOWN_ROOM_VERSIONS,
- EventTypes,
- Membership,
- RejectedReason,
- RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -44,6 +38,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
@@ -1733,7 +1728,9 @@ class FederationHandler(BaseHandler):
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
- room_version = create_event.content.get("room_version", RoomVersions.V1)
+ room_version = create_event.content.get(
+ "room_version", RoomVersions.V1.identifier,
+ )
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 39184f0e22..22469486d7 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -132,6 +132,14 @@ class IdentityHandler(BaseHandler):
}
)
logger.debug("bound threepid %r to %s", creds, mxid)
+
+ # Remember where we bound the threepid
+ yield self.store.add_user_bound_threepid(
+ user_id=mxid,
+ medium=data["medium"],
+ address=data["address"],
+ id_server=id_server,
+ )
except CodeMessageException as e:
data = json.loads(e.msg) # XXX WAT?
defer.returnValue(data)
@@ -142,30 +150,61 @@ class IdentityHandler(BaseHandler):
Args:
mxid (str): Matrix user ID of binding to be removed
- threepid (dict): Dict with medium & address of binding to be removed
+ threepid (dict): Dict with medium & address of binding to be
+ removed, and an optional id_server.
Raises:
SynapseError: If we failed to contact the identity server
Returns:
Deferred[bool]: True on success, otherwise False if the identity
- server doesn't support unbinding
+ server doesn't support unbinding (or no identity server found to
+ contact).
"""
- logger.debug("unbinding threepid %r from %s", threepid, mxid)
- if not self.trusted_id_servers:
- logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+ if threepid.get("id_server"):
+ id_servers = [threepid["id_server"]]
+ else:
+ id_servers = yield self.store.get_id_servers_user_bound(
+ user_id=mxid,
+ medium=threepid["medium"],
+ address=threepid["address"],
+ )
+
+ # We don't know where to unbind, so we don't have a choice but to return
+ if not id_servers:
defer.returnValue(False)
- # We don't track what ID server we added 3pids on (perhaps we ought to)
- # but we assume that any of the servers in the trusted list are in the
- # same ID server federation, so we can pick any one of them to send the
- # deletion request to.
- id_server = next(iter(self.trusted_id_servers))
+ changed = True
+ for id_server in id_servers:
+ changed &= yield self.try_unbind_threepid_with_id_server(
+ mxid, threepid, id_server,
+ )
+
+ defer.returnValue(changed)
+
+ @defer.inlineCallbacks
+ def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
+ """Removes a binding from an identity server
+ Args:
+ mxid (str): Matrix user ID of binding to be removed
+ threepid (dict): Dict with medium & address of binding to be removed
+ id_server (str): Identity server to unbind from
+
+ Raises:
+ SynapseError: If we failed to contact the identity server
+
+ Returns:
+ Deferred[bool]: True on success, otherwise False if the identity
+ server doesn't support unbinding
+ """
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
content = {
"mxid": mxid,
- "threepid": threepid,
+ "threepid": {
+ "medium": threepid["medium"],
+ "address": threepid["address"],
+ },
}
# we abuse the federation http client to sign the request, but we have to send it
@@ -188,16 +227,24 @@ class IdentityHandler(BaseHandler):
content,
headers,
)
+ changed = True
except HttpResponseException as e:
+ changed = False
if e.code in (400, 404, 501,):
# The remote server probably doesn't support unbinding (yet)
logger.warn("Received %d response while unbinding threepid", e.code)
- defer.returnValue(False)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(502, "Failed to contact identity server")
- defer.returnValue(True)
+ yield self.store.remove_user_bound_threepid(
+ user_id=mxid,
+ medium=threepid["medium"],
+ address=threepid["address"],
+ id_server=id_server,
+ )
+
+ defer.returnValue(changed)
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9b41c7b205..224d34ef3a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from synapse.api.constants import EventTypes, Membership, RoomVersions
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -30,6 +30,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
+from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -191,7 +192,7 @@ class MessageHandler(object):
"Getting joined members after leaving is not implemented"
)
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
+ users_with_profile = yield self.state.get_current_users_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
@@ -603,7 +604,9 @@ class EventCreationHandler(object):
"""
if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
- room_version = event.content.get("room_version", RoomVersions.V1)
+ room_version = event.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
else:
room_version = yield self.store.get_room_version(event.room_id)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..59d53f1050 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
+ self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
@@ -110,30 +113,6 @@ class PresenceHandler(object):
federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
- federation_registry.register_edu_handler(
- "m.presence_invite",
- lambda origin, content: self.invite_presence(
- observed_user=UserID.from_string(content["observed_user"]),
- observer_user=UserID.from_string(content["observer_user"]),
- )
- )
- federation_registry.register_edu_handler(
- "m.presence_accept",
- lambda origin, content: self.accept_presence(
- observed_user=UserID.from_string(content["observed_user"]),
- observer_user=UserID.from_string(content["observer_user"]),
- )
- )
- federation_registry.register_edu_handler(
- "m.presence_deny",
- lambda origin, content: self.deny_presence(
- observed_user=UserID.from_string(content["observed_user"]),
- observer_user=UserID.from_string(content["observer_user"]),
- )
- )
-
- distributor = hs.get_distributor()
- distributor.observe("user_joined_room", self.user_joined_room)
active_presence = self.store.take_presence_startup_info()
@@ -220,6 +199,15 @@ class PresenceHandler(object):
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
+ # Used to handle sending of presence to newly joined users/servers
+ if hs.config.use_presence:
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # Presence is best effort and quickly heals itself, so lets just always
+ # stream from the current state when we restart.
+ self._event_pos = self.store.get_current_events_token()
+ self._event_processing = False
+
@defer.inlineCallbacks
def _on_shutdown(self):
"""Gets called when shutting down. This lets us persist any updates that
@@ -751,199 +739,178 @@ class PresenceHandler(object):
yield self._update_states([prev_state.copy_and_replace(**new_fields)])
@defer.inlineCallbacks
- def user_joined_room(self, user, room_id):
- """Called (via the distributor) when a user joins a room. This funciton
- sends presence updates to servers, either:
- 1. the joining user is a local user and we send their presence to
- all servers in the room.
- 2. the joining user is a remote user and so we send presence for all
- local users in the room.
+ def is_visible(self, observed_user, observer_user):
+ """Returns whether a user can see another user's presence.
"""
- # We only need to send presence to servers that don't have it yet. We
- # don't need to send to local clients here, as that is done as part
- # of the event stream/sync.
- # TODO: Only send to servers not already in the room.
- if self.is_mine(user):
- state = yield self.current_state_for_user(user.to_string())
-
- self._push_to_remotes([state])
- else:
- user_ids = yield self.store.get_users_in_room(room_id)
- user_ids = list(filter(self.is_mine_id, user_ids))
+ observer_room_ids = yield self.store.get_rooms_for_user(
+ observer_user.to_string()
+ )
+ observed_room_ids = yield self.store.get_rooms_for_user(
+ observed_user.to_string()
+ )
- states = yield self.current_state_for_users(user_ids)
+ if observer_room_ids & observed_room_ids:
+ defer.returnValue(True)
- self._push_to_remotes(list(states.values()))
+ defer.returnValue(False)
@defer.inlineCallbacks
- def get_presence_list(self, observer_user, accepted=None):
- """Returns the presence for all users in their presence list.
+ def get_all_presence_updates(self, last_id, current_id):
"""
- if not self.is_mine(observer_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
-
- presence_list = yield self.store.get_presence_list(
- observer_user.localpart, accepted=accepted
- )
+ Gets a list of presence update rows from between the given stream ids.
+ Each row has:
+ - stream_id(str)
+ - user_id(str)
+ - state(str)
+ - last_active_ts(int)
+ - last_federation_update_ts(int)
+ - last_user_sync_ts(int)
+ - status_msg(int)
+ - currently_active(int)
+ """
+ # TODO(markjh): replicate the unpersisted changes.
+ # This could use the in-memory stores for recent changes.
+ rows = yield self.store.get_all_presence_updates(last_id, current_id)
+ defer.returnValue(rows)
- results = yield self.get_states(
- target_user_ids=[row["observed_user_id"] for row in presence_list],
- as_event=False,
- )
+ def notify_new_event(self):
+ """Called when new events have happened. Handles users and servers
+ joining rooms and require being sent presence.
+ """
- now = self.clock.time_msec()
- results[:] = [format_user_presence_state(r, now) for r in results]
+ if self._event_processing:
+ return
- is_accepted = {
- row["observed_user_id"]: row["accepted"] for row in presence_list
- }
+ @defer.inlineCallbacks
+ def _process_presence():
+ assert not self._event_processing
- for result in results:
- result.update({
- "accepted": is_accepted,
- })
+ self._event_processing = True
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._event_processing = False
- defer.returnValue(results)
+ run_as_background_process("presence.notify_new_event", _process_presence)
@defer.inlineCallbacks
- def send_presence_invite(self, observer_user, observed_user):
- """Sends a presence invite.
- """
- yield self.store.add_presence_list_pending(
- observer_user.localpart, observed_user.to_string()
- )
+ def _unsafe_process(self):
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "presence_delta"):
+ deltas = yield self.store.get_current_state_deltas(self._event_pos)
+ if not deltas:
+ return
- if self.is_mine(observed_user):
- yield self.invite_presence(observed_user, observer_user)
- else:
- yield self.federation.build_and_send_edu(
- destination=observed_user.domain,
- edu_type="m.presence_invite",
- content={
- "observed_user": observed_user.to_string(),
- "observer_user": observer_user.to_string(),
- }
- )
+ yield self._handle_state_delta(deltas)
+
+ self._event_pos = deltas[-1]["stream_id"]
+
+ # Expose current event processing position to prometheus
+ synapse.metrics.event_processing_positions.labels("presence").set(
+ self._event_pos
+ )
@defer.inlineCallbacks
- def invite_presence(self, observed_user, observer_user):
- """Handles new presence invites.
+ def _handle_state_delta(self, deltas):
+ """Process current state deltas to find new joins that need to be
+ handled.
"""
- if not self.is_mine(observed_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ prev_event_id = delta["prev_event_id"]
- # TODO: Don't auto accept
- if self.is_mine(observer_user):
- yield self.accept_presence(observed_user, observer_user)
- else:
- self.federation.build_and_send_edu(
- destination=observer_user.domain,
- edu_type="m.presence_accept",
- content={
- "observed_user": observed_user.to_string(),
- "observer_user": observer_user.to_string(),
- }
- )
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
- state_dict = yield self.get_state(observed_user, as_event=False)
- state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
+ if typ != EventTypes.Member:
+ continue
- self.federation.build_and_send_edu(
- destination=observer_user.domain,
- edu_type="m.presence",
- content={
- "push": [state_dict]
- }
- )
+ if event_id is None:
+ # state has been deleted, so this is not a join. We only care about
+ # joins.
+ continue
- @defer.inlineCallbacks
- def accept_presence(self, observed_user, observer_user):
- """Handles a m.presence_accept EDU. Mark a presence invite from a
- local or remote user as accepted in a local user's presence list.
- Starts polling for presence updates from the local or remote user.
- Args:
- observed_user(UserID): The user to update in the presence list.
- observer_user(UserID): The owner of the presence list to update.
- """
- yield self.store.set_presence_list_accepted(
- observer_user.localpart, observed_user.to_string()
- )
+ event = yield self.store.get_event(event_id)
+ if event.content.get("membership") != Membership.JOIN:
+ # We only care about joins
+ continue
- @defer.inlineCallbacks
- def deny_presence(self, observed_user, observer_user):
- """Handle a m.presence_deny EDU. Removes a local or remote user from a
- local user's presence list.
- Args:
- observed_user(UserID): The local or remote user to remove from the
- list.
- observer_user(UserID): The local owner of the presence list.
- Returns:
- A Deferred.
- """
- yield self.store.del_presence_list(
- observer_user.localpart, observed_user.to_string()
- )
+ if prev_event_id:
+ prev_event = yield self.store.get_event(prev_event_id)
+ if prev_event.content.get("membership") == Membership.JOIN:
+ # Ignore changes to join events.
+ continue
- # TODO(paul): Inform the user somehow?
+ yield self._on_user_joined_room(room_id, state_key)
@defer.inlineCallbacks
- def drop(self, observed_user, observer_user):
- """Remove a local or remote user from a local user's presence list and
- unsubscribe the local user from updates that user.
+ def _on_user_joined_room(self, room_id, user_id):
+ """Called when we detect a user joining the room via the current state
+ delta stream.
+
Args:
- observed_user(UserId): The local or remote user to remove from the
- list.
- observer_user(UserId): The local owner of the presence list.
+ room_id (str)
+ user_id (str)
+
Returns:
- A Deferred.
+ Deferred
"""
- if not self.is_mine(observer_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
- yield self.store.del_presence_list(
- observer_user.localpart, observed_user.to_string()
- )
+ if self.is_mine_id(user_id):
+ # If this is a local user then we need to send their presence
+ # out to hosts in the room (who don't already have it)
- # TODO: Inform the remote that we've dropped the presence list.
+ # TODO: We should be able to filter the hosts down to those that
+ # haven't previously seen the user
- @defer.inlineCallbacks
- def is_visible(self, observed_user, observer_user):
- """Returns whether a user can see another user's presence.
- """
- observer_room_ids = yield self.store.get_rooms_for_user(
- observer_user.to_string()
- )
- observed_room_ids = yield self.store.get_rooms_for_user(
- observed_user.to_string()
- )
+ state = yield self.current_state_for_user(user_id)
+ hosts = yield self.state.get_current_hosts_in_room(room_id)
- if observer_room_ids & observed_room_ids:
- defer.returnValue(True)
+ # Filter out ourselves.
+ hosts = set(host for host in hosts if host != self.server_name)
- accepted_observers = yield self.store.get_presence_list_observers_accepted(
- observed_user.to_string()
- )
+ self.federation.send_presence_to_destinations(
+ states=[state],
+ destinations=hosts,
+ )
+ else:
+ # A remote user has joined the room, so we need to:
+ # 1. Check if this is a new server in the room
+ # 2. If so send any presence they don't already have for
+ # local users in the room.
- defer.returnValue(observer_user.to_string() in accepted_observers)
+ # TODO: We should be able to filter the users down to those that
+ # the server hasn't previously seen
- @defer.inlineCallbacks
- def get_all_presence_updates(self, last_id, current_id):
- """
- Gets a list of presence update rows from between the given stream ids.
- Each row has:
- - stream_id(str)
- - user_id(str)
- - state(str)
- - last_active_ts(int)
- - last_federation_update_ts(int)
- - last_user_sync_ts(int)
- - status_msg(int)
- - currently_active(int)
- """
- # TODO(markjh): replicate the unpersisted changes.
- # This could use the in-memory stores for recent changes.
- rows = yield self.store.get_all_presence_updates(last_id, current_id)
- defer.returnValue(rows)
+ # TODO: Check that this is actually a new server joining the
+ # room.
+
+ user_ids = yield self.state.get_current_users_in_room(room_id)
+ user_ids = list(filter(self.is_mine_id, user_ids))
+
+ states = yield self.current_state_for_users(user_ids)
+
+ # Filter out old presence, i.e. offline presence states where
+ # the user hasn't been active for a week. We can change this
+ # depending on what we want the UX to be, but at the least we
+ # should filter out offline presence where the state is just the
+ # default state.
+ now = self.clock.time_msec()
+ states = [
+ state for state in states.values()
+ if state.state != PresenceState.OFFLINE
+ or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+ or state.status_msg is not None
+ ]
+
+ if states:
+ self.federation.send_presence_to_destinations(
+ states=states,
+ destinations=[get_domain_from_id(user_id)],
+ )
def should_notify(old_state, new_state):
@@ -1086,10 +1053,7 @@ class PresenceEventSource(object):
updates for
"""
user_id = user.to_string()
- plist = yield self.store.get_presence_list_accepted(
- user.localpart, on_invalidate=cache_context.invalidate,
- )
- users_interested_in = set(row["observed_user_id"] for row in plist)
+ users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@@ -1294,10 +1258,6 @@ def get_interested_parties(store, states):
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)
- plist = yield store.get_presence_list_observers_accepted(state.user_id)
- for u in plist:
- users_to_states.setdefault(u, []).append(state)
-
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 58940e0320..a51d11a257 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -153,6 +153,7 @@ class RegistrationHandler(BaseHandler):
user_type=None,
default_display_name=None,
address=None,
+ bind_emails=[],
):
"""Registers a new client on the server.
@@ -172,6 +173,7 @@ class RegistrationHandler(BaseHandler):
default_display_name (unicode|None): if set, the new user's displayname
will be set to this. Defaults to 'localpart'.
address (str|None): the IP address used to perform the registration.
+ bind_emails (List[str]): list of emails to bind to this account.
Returns:
A tuple of (user_id, access_token).
Raises:
@@ -261,6 +263,21 @@ class RegistrationHandler(BaseHandler):
if not self.hs.config.user_consent_at_registration:
yield self._auto_join_rooms(user_id)
+ # Bind any specified emails to this account
+ current_time = self.hs.get_clock().time_msec()
+ for email in bind_emails:
+ # generate threepid dict
+ threepid_dict = {
+ "medium": "email",
+ "address": email,
+ "validated_at": current_time,
+ }
+
+ # Bind email to new account
+ yield self._register_email_threepid(
+ user_id, threepid_dict, None, False,
+ )
+
defer.returnValue((user_id, token))
@defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 67b15697fd..e37ae96899 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,14 +25,9 @@ from six import iteritems, string_types
from twisted.internet import defer
-from synapse.api.constants import (
- DEFAULT_ROOM_VERSION,
- KNOWN_ROOM_VERSIONS,
- EventTypes,
- JoinRules,
- RoomCreationPreset,
-)
+from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -285,6 +280,7 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.RoomAvatar, ""),
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
+ (EventTypes.RelatedGroups, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -406,7 +402,7 @@ class RoomCreationHandler(BaseHandler):
yield directory_handler.create_association(
requester, RoomAlias.from_string(alias),
new_room_id, servers=(self.hs.hostname, ),
- send_event=False,
+ send_event=False, check_membership=False,
)
logger.info("Moved alias %s to new room", alias)
except SynapseError as e:
@@ -479,7 +475,7 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
- room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+ room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
if not isinstance(room_version, string_types):
raise SynapseError(
400,
@@ -542,6 +538,7 @@ class RoomCreationHandler(BaseHandler):
room_alias=room_alias,
servers=[self.hs.hostname],
send_event=False,
+ check_membership=False,
)
preset_config = config.get(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d6c9d56007..617d1c9ef8 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -167,7 +167,7 @@ class RoomListHandler(BaseHandler):
if not latest_event_ids:
return
- joined_users = yield self.state_handler.get_current_user_in_room(
+ joined_users = yield self.state_handler.get_current_users_in_room(
room_id, latest_event_ids,
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 71ce5b54e5..024d6db27a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -70,6 +70,7 @@ class RoomMemberHandler(object):
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
self._server_notices_mxid = self.config.server_notices_mxid
+ self._enable_lookup = hs.config.enable_3pid_lookup
@abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -421,6 +422,9 @@ class RoomMemberHandler(object):
room_id, latest_event_ids=latest_event_ids,
)
+ # TODO: Refactor into dictionary of explicitly allowed transitions
+ # between old and new state, with specific error messages for some
+ # transitions and generic otherwise
old_state_id = current_state_ids.get((EventTypes.Member, target.to_string()))
if old_state_id:
old_state = yield self.store.get_event(old_state_id, allow_none=True)
@@ -446,6 +450,9 @@ class RoomMemberHandler(object):
if same_sender and same_membership and same_content:
defer.returnValue(old_state)
+ if old_membership in ["ban", "leave"] and action == "kick":
+ raise AuthError(403, "The target user is not in the room")
+
# we don't allow people to reject invites to the server notice
# room, but they can leave it once they are joined.
if (
@@ -459,6 +466,9 @@ class RoomMemberHandler(object):
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
+ else:
+ if action == "kick":
+ raise AuthError(403, "The target user is not in the room")
is_host_in_room = yield self._is_host_in_room(current_state_ids)
@@ -729,6 +739,10 @@ class RoomMemberHandler(object):
Returns:
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
+ if not self._enable_lookup:
+ raise SynapseError(
+ 403, "Looking up third-party identifiers is denied from this server",
+ )
try:
data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 57bb996245..153312e39f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1049,11 +1049,11 @@ class SyncHandler(object):
# TODO: Be more clever than this, i.e. remove users who we already
# share a room with?
for room_id in newly_joined_rooms:
- joined_users = yield self.state.get_current_user_in_room(room_id)
+ joined_users = yield self.state.get_current_users_in_room(room_id)
newly_joined_users.update(joined_users)
for room_id in newly_left_rooms:
- left_users = yield self.state.get_current_user_in_room(room_id)
+ left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)
# TODO: Check that these users are actually new, i.e. either they
@@ -1213,7 +1213,7 @@ class SyncHandler(object):
extra_users_ids = set(newly_joined_users)
for room_id in newly_joined_rooms:
- users = yield self.state.get_current_user_in_room(room_id)
+ users = yield self.state.get_current_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
@@ -1855,7 +1855,7 @@ class SyncHandler(object):
extrems = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering,
)
- users_in_room = yield self.state.get_current_user_in_room(
+ users_in_room = yield self.state.get_current_users_in_room(
room_id, extrems,
)
if user_id in users_in_room:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 39df960c31..972662eb48 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,7 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _push_remote(self, member, typing):
try:
- users = yield self.state.get_current_user_in_room(member.room_id)
+ users = yield self.state.get_current_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
@@ -261,7 +261,7 @@ class TypingHandler(object):
)
return
- users = yield self.state.get_current_user_in_room(room_id)
+ users = yield self.state.get_current_users_in_room(room_id)
domains = set(get_domain_from_id(u) for u in users)
if self.server_name in domains:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index b689979b4b..5de9630950 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -276,7 +276,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# ignore the change
return
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
+ users_with_profile = yield self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in iterkeys(users_with_profile):
@@ -325,7 +325,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id
)
# Now we update users who share rooms with users.
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
+ users_with_profile = yield self.state.get_current_users_in_room(room_id)
if is_public:
yield self.store.add_users_in_public_rooms(room_id, (user_id,))
|