diff options
Diffstat (limited to '')
60 files changed, 2651 insertions, 538 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 9e912fdfbe..44e38b777a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError -from synapse.types import Requester, RoomID, UserID, EventID +from synapse.types import Requester, UserID, get_domian_from_id from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn from synapse.util.metrics import Measure @@ -91,8 +91,8 @@ class Auth(object): "Room %r does not exist" % (event.room_id,) ) - creating_domain = RoomID.from_string(event.room_id).domain - originating_domain = UserID.from_string(event.sender).domain + creating_domain = get_domian_from_id(event.room_id) + originating_domain = get_domian_from_id(event.sender) if creating_domain != originating_domain: if not self.can_federate(event, auth_events): raise AuthError( @@ -219,7 +219,7 @@ class Auth(object): for event in curr_state.values(): if event.type == EventTypes.Member: try: - if UserID.from_string(event.state_key).domain != host: + if get_domian_from_id(event.state_key) != host: continue except: logger.warn("state_key not user_id: %s", event.state_key) @@ -266,8 +266,8 @@ class Auth(object): target_user_id = event.state_key - creating_domain = RoomID.from_string(event.room_id).domain - target_domain = UserID.from_string(target_user_id).domain + creating_domain = get_domian_from_id(event.room_id) + target_domain = get_domian_from_id(target_user_id) if creating_domain != target_domain: if not self.can_federate(event, auth_events): raise AuthError( @@ -612,7 +612,8 @@ class Auth(object): def get_user_from_macaroon(self, macaroon_str): try: macaroon = pymacaroons.Macaroon.deserialize(macaroon_str) - self.validate_macaroon(macaroon, "access", False) + + self.validate_macaroon(macaroon, "access", self.hs.config.expire_access_token) user_prefix = "user_id = " user = None @@ -889,8 +890,8 @@ class Auth(object): if user_level >= redact_level: return False - redacter_domain = EventID.from_string(event.event_id).domain - redactee_domain = EventID.from_string(event.redacts).domain + redacter_domain = get_domian_from_id(event.event_id) + redactee_domain = get_domian_from_id(event.redacts) if redacter_domain == redactee_domain: return True diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index cd699ef27f..4f5a4281fa 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -15,6 +15,8 @@ from synapse.api.errors import SynapseError from synapse.types import UserID, RoomID +from twisted.internet import defer + import ujson as json @@ -24,10 +26,10 @@ class Filtering(object): super(Filtering, self).__init__() self.store = hs.get_datastore() + @defer.inlineCallbacks def get_user_filter(self, user_localpart, filter_id): - result = self.store.get_user_filter(user_localpart, filter_id) - result.addCallback(FilterCollection) - return result + result = yield self.store.get_user_filter(user_localpart, filter_id) + defer.returnValue(FilterCollection(result)) def add_user_filter(self, user_localpart, user_filter): self.check_valid_filter(user_filter) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b5339f030d..135dd58c15 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -20,11 +20,14 @@ from synapse.server import HomeServer from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.storage.roommember import RoomMemberStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep @@ -58,6 +61,7 @@ class SlaveConfig(DatabaseConfig): self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") self.pid_file = self.abspath(config.get("pid_file")) + self.public_baseurl = config["public_baseurl"] def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") @@ -91,12 +95,13 @@ class SlaveConfig(DatabaseConfig): """ % locals() -class PusherSlaveConfig(SlaveConfig, LoggingConfig): +class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): pass class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, + SlavedAccountDataStore ): update_pusher_last_stream_ordering_and_success = ( DataStore.update_pusher_last_stream_ordering_and_success.__func__ @@ -110,6 +115,31 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering.__func__ ) + get_throttle_params_by_room = ( + DataStore.get_throttle_params_by_room.__func__ + ) + + set_throttle_params = ( + DataStore.set_throttle_params.__func__ + ) + + get_time_of_last_push_action_before = ( + DataStore.get_time_of_last_push_action_before.__func__ + ) + + get_profile_displayname = ( + DataStore.get_profile_displayname.__func__ + ) + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + class PusherServer(HomeServer): @@ -189,6 +219,7 @@ class PusherServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url pusher_pool = self.get_pusherpool() + clock = self.get_clock() def stop_pusher(user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -240,11 +271,21 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id, affected_room_ids ) + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) yield store.process_replication(result) poke_pushers(result) except: diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py new file mode 100644 index 0000000000..90bdd08f00 --- /dev/null +++ b/synapse/config/emailconfig.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file can't be called email.py because if it is, we cannot: +import email.utils + +from ._base import Config + + +class EmailConfig(Config): + def read_config(self, config): + self.email_enable_notifs = False + + email_config = config.get("email", {}) + self.email_enable_notifs = email_config.get("enable_notifs", False) + + if self.email_enable_notifs: + # make sure we can import the required deps + import jinja2 + import bleach + # prevent unused warnings + jinja2 + bleach + + required = [ + "smtp_host", + "smtp_port", + "notif_from", + "template_dir", + "notif_template_html", + "notif_template_text", + ] + + missing = [] + for k in required: + if k not in email_config: + missing.append(k) + + if (len(missing) > 0): + raise RuntimeError( + "email.enable_notifs is True but required keys are missing: %s" % + (", ".join(["email." + k for k in missing]),) + ) + + if config.get("public_baseurl") is None: + raise RuntimeError( + "email.enable_notifs is True but no public_baseurl is set" + ) + + self.email_smtp_host = email_config["smtp_host"] + self.email_smtp_port = email_config["smtp_port"] + self.email_notif_from = email_config["notif_from"] + self.email_template_dir = email_config["template_dir"] + self.email_notif_template_html = email_config["notif_template_html"] + self.email_notif_template_text = email_config["notif_template_text"] + self.email_notif_for_new_users = email_config.get( + "notif_for_new_users", True + ) + if "app_name" in email_config: + self.email_app_name = email_config["app_name"] + else: + self.email_app_name = "Matrix" + + # make sure it's valid + parsed = email.utils.parseaddr(self.email_notif_from) + if parsed[1] == '': + raise RuntimeError("Invalid notif_from address") + else: + self.email_enable_notifs = False + # Not much point setting defaults for the rest: it would be an + # error for them to be used. + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Enable sending emails for notification events + #email: + # enable_notifs: false + # smtp_host: "localhost" + # smtp_port: 25 + # notif_from: Your Friendly Matrix Home Server <noreply@example.com> + # app_name: Matrix + # template_dir: res/templates + # notif_template_html: notif_mail.html + # notif_template_text: notif_mail.txt + # notif_for_new_users: True + """ diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 9a80ac39ec..fc2445484c 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -31,13 +31,14 @@ from .cas import CasConfig from .password import PasswordConfig from .jwt import JWTConfig from .ldap import LDAPConfig +from .emailconfig import EmailConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, LDAPConfig, PasswordConfig,): + JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,): pass diff --git a/synapse/config/key.py b/synapse/config/key.py index a072aec714..6ee643793e 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -57,6 +57,8 @@ class KeyConfig(Config): seed = self.signing_key[0].seed self.macaroon_secret_key = hashlib.sha256(seed) + self.expire_access_token = config.get("expire_access_token", False) + def default_config(self, config_dir_path, server_name, is_generating_file=False, **kwargs): base_key_name = os.path.join(config_dir_path, server_name) @@ -69,6 +71,9 @@ class KeyConfig(Config): return """\ macaroon_secret_key: "%(macaroon_secret_key)s" + # Used to enable access token expiration. + expire_access_token: False + ## Signing Keys ## # Path to the signing key to sign messages with diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 87e500c97a..cc3f879857 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -32,6 +32,7 @@ class RegistrationConfig(Config): ) self.registration_shared_secret = config.get("registration_shared_secret") + self.user_creation_max_duration = int(config["user_creation_max_duration"]) self.bcrypt_rounds = config.get("bcrypt_rounds", 12) self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"] @@ -54,6 +55,11 @@ class RegistrationConfig(Config): # secret, even if registration is otherwise disabled. registration_shared_secret: "%(registration_shared_secret)s" + # Sets the expiry for the short term user creation in + # milliseconds. For instance the bellow duration is two weeks + # in milliseconds. + user_creation_max_duration: 1209600000 + # Set the number of bcrypt rounds used to generate password hash. # Larger numbers increase the work factor needed to generate the hash. # The default number of rounds is 12. diff --git a/synapse/config/server.py b/synapse/config/server.py index 46c633548a..0b5f462e44 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -28,6 +28,11 @@ class ServerConfig(Config): self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.public_baseurl = config.get("public_baseurl") + + if self.public_baseurl is not None: + if self.public_baseurl[-1] != '/': + self.public_baseurl += '/' self.start_pushers = config.get("start_pushers", True) self.listeners = config.get("listeners", []) @@ -143,6 +148,9 @@ class ServerConfig(Config): # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True + # The public-facing base URL for the client API (not including _matrix/...) + # public_baseurl: https://example.com:8448/ + # Set the soft limit on the number of file descriptors synapse can use # Zero is used to indicate synapse should set the soft limit to the # hard limit. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 429ab6ddec..f1d231b9d8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -388,6 +388,11 @@ class FederationServer(FederationBase): }) @log_function + def on_openid_userinfo(self, token): + ts_now_ms = self._clock.time_msec() + return self.store.get_user_id_for_open_id_token(token, ts_now_ms) + + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1928da03b3..5787f854d4 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -20,6 +20,7 @@ from .persistence import TransactionActions from .units import Transaction from synapse.api.errors import HttpResponseException +from synapse.util.async import run_on_reactor from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.util.retryutils import ( @@ -199,6 +200,8 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + yield run_on_reactor() + # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index d65a7893d8..5b6c7d11dd 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request, parse_string from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -323,7 +323,7 @@ class FederationSendLeaveServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)" + PATH = "/event_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) @@ -448,6 +448,50 @@ class On3pidBindServlet(BaseFederationServlet): return code +class OpenIdUserInfo(BaseFederationServlet): + """ + Exchange a bearer token for information about a user. + + The response format should be compatible with: + http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse + + GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "sub": "@userpart:example.org", + } + """ + + PATH = "/openid/userinfo" + + @defer.inlineCallbacks + def on_GET(self, request): + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -468,6 +512,7 @@ SERVLET_CLASSES = ( FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, + OpenIdUserInfo, ) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 13a675b208..c904c6c500 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -15,13 +15,10 @@ from twisted.internet import defer -from synapse.api.errors import LimitExceededError, SynapseError, AuthError -from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.errors import LimitExceededError from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, RoomAlias, Requester -from synapse.push.action_generator import ActionGenerator +from synapse.types import UserID, Requester -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn import logging @@ -29,23 +26,6 @@ import logging logger = logging.getLogger(__name__) -VISIBILITY_PRIORITY = ( - "world_readable", - "shared", - "invited", - "joined", -) - - -MEMBERSHIP_PRIORITY = ( - Membership.JOIN, - Membership.INVITE, - Membership.KNOCK, - Membership.LEAVE, - Membership.BAN, -) - - class BaseHandler(object): """ Common base class for the event handlers. @@ -65,161 +45,10 @@ class BaseHandler(object): self.clock = hs.get_clock() self.hs = hs - self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.event_builder_factory = hs.get_event_builder_factory() - @defer.inlineCallbacks - def filter_events_for_clients(self, user_tuples, events, event_id_to_state): - """ Returns dict of user_id -> list of events that user is allowed to - see. - - Args: - user_tuples (str, bool): (user id, is_peeking) for each user to be - checked. is_peeking should be true if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the - given events - events ([synapse.events.EventBase]): list of events to filter - """ - forgotten = yield defer.gatherResults([ - self.store.who_forgot_in_room( - room_id, - ) - for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) - - # Set of membership event_ids that have been forgotten - event_id_forgotten = frozenset( - row["event_id"] for rows in forgotten for row in rows - ) - - def allowed(event, user_id, is_peeking): - """ - Args: - event (synapse.events.EventBase): event to check - user_id (str) - is_peeking (bool) - """ - state = event_id_to_state[event.event_id] - - # get the room_visibility at the time of the event. - visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) - if visibility_event: - visibility = visibility_event.content.get("history_visibility", "shared") - else: - visibility = "shared" - - if visibility not in VISIBILITY_PRIORITY: - visibility = "shared" - - # if it was world_readable, it's easy: everyone can read it - if visibility == "world_readable": - return True - - # Always allow history visibility events on boundaries. This is done - # by setting the effective visibility to the least restrictive - # of the old vs new. - if event.type == EventTypes.RoomHistoryVisibility: - prev_content = event.unsigned.get("prev_content", {}) - prev_visibility = prev_content.get("history_visibility", None) - - if prev_visibility not in VISIBILITY_PRIORITY: - prev_visibility = "shared" - - new_priority = VISIBILITY_PRIORITY.index(visibility) - old_priority = VISIBILITY_PRIORITY.index(prev_visibility) - if old_priority < new_priority: - visibility = prev_visibility - - # likewise, if the event is the user's own membership event, use - # the 'most joined' membership - membership = None - if event.type == EventTypes.Member and event.state_key == user_id: - membership = event.content.get("membership", None) - if membership not in MEMBERSHIP_PRIORITY: - membership = "leave" - - prev_content = event.unsigned.get("prev_content", {}) - prev_membership = prev_content.get("membership", None) - if prev_membership not in MEMBERSHIP_PRIORITY: - prev_membership = "leave" - - new_priority = MEMBERSHIP_PRIORITY.index(membership) - old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) - if old_priority < new_priority: - membership = prev_membership - - # otherwise, get the user's membership at the time of the event. - if membership is None: - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - if membership_event.event_id not in event_id_forgotten: - membership = membership_event.membership - - # if the user was a member of the room at the time of the event, - # they can see it. - if membership == Membership.JOIN: - return True - - if visibility == "joined": - # we weren't a member at the time of the event, so we can't - # see this event. - return False - - elif visibility == "invited": - # user can also see the event if they were *invited* at the time - # of the event. - return membership == Membership.INVITE - - else: - # visibility is shared: user can also see the event if they have - # become a member since the event - # - # XXX: if the user has subsequently joined and then left again, - # ideally we would share history up to the point they left. But - # we don't know when they left. - return not is_peeking - - defer.returnValue({ - user_id: [ - event - for event in events - if allowed(event, user_id, is_peeking) - ] - for user_id, is_peeking in user_tuples - }) - - @defer.inlineCallbacks - def _filter_events_for_client(self, user_id, events, is_peeking=False): - """ - Check which events a user is allowed to see - - Args: - user_id(str): user id to be checked - events([synapse.events.EventBase]): list of events to be checked - is_peeking(bool): should be True if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the given - events - - Returns: - [synapse.events.EventBase] - """ - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - res = yield self.filter_events_for_clients( - [(user_id, is_peeking)], events, event_id_to_state - ) - defer.returnValue(res.get(user_id, [])) - def ratelimit(self, requester): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( @@ -232,56 +61,6 @@ class BaseHandler(object): retry_after_ms=int(1000 * (time_allowed - time_now)), ) - @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 - - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] - - builder.prev_events = prev_events - builder.depth = depth - - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) - - if builder.is_state(): - builder.prev_state = yield self.store.add_event_hashes( - context.prev_state_events - ) - - yield self.auth.add_auth_events(builder, context) - - add_hashes_and_signatures( - builder, self.server_name, self.signing_key - ) - - event = builder.build() - - logger.debug( - "Created event %s with current state: %s", - event.event_id, context.current_state, - ) - - defer.returnValue( - (event, context,) - ) - def is_host_in_room(self, current_state): room_members = [ (state_key, event.membership) @@ -296,154 +75,13 @@ class BaseHandler(object): return True for (state_key, membership) in room_members: if ( - UserID.from_string(state_key).domain == self.hs.hostname + self.hs.is_mine_id(state_key) and membership == Membership.JOIN ): return True return False @defer.inlineCallbacks - def handle_new_client_event( - self, - requester, - event, - context, - ratelimit=True, - extra_users=[] - ): - # We now need to go and hit out to wherever we need to hit out to. - - if ratelimit: - self.ratelimit(requester) - - try: - self.auth.check(event, auth_events=context.current_state) - except AuthError as err: - logger.warn("Denying new event %r because %s", event, err) - raise err - - yield self.maybe_kick_guest_users(event, context.current_state.values()) - - if event.type == EventTypes.CanonicalAlias: - # Check the alias is acually valid (at this time at least) - room_alias_str = event.content.get("alias", None) - if room_alias_str: - room_alias = RoomAlias.from_string(room_alias_str) - directory_handler = self.hs.get_handlers().directory_handler - mapping = yield directory_handler.get_association(room_alias) - - if mapping["room_id"] != event.room_id: - raise SynapseError( - 400, - "Room alias %s does not point to the room" % ( - room_alias_str, - ) - ) - - federation_handler = self.hs.get_handlers().federation_handler - - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.INVITE: - def is_inviter_member_event(e): - return ( - e.type == EventTypes.Member and - e.sender == event.sender - ) - - event.unsigned["invite_room_state"] = [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for k, e in context.current_state.items() - if e.type in self.hs.config.room_invite_state_types - or is_inviter_member_event(e) - ] - - invitee = UserID.from_string(event.state_key) - if not self.hs.is_mine(invitee): - # TODO: Can we add signature from remote server in a nicer - # way? If we have been invited by a remote server, we need - # to get them to sign the event. - - returned_invite = yield federation_handler.send_invite( - invitee.domain, - event, - ) - - event.unsigned.pop("room_state", None) - - # TODO: Make sure the signatures actually are correct. - event.signatures.update( - returned_invite.signatures - ) - - if event.type == EventTypes.Redaction: - if self.auth.check_redaction(event, auth_events=context.current_state): - original_event = yield self.store.get_event( - event.redacts, - check_redacted=False, - get_prev_content=False, - allow_rejected=False, - allow_none=False - ) - if event.user_id != original_event.user_id: - raise AuthError( - 403, - "You don't have permission to redact events" - ) - - if event.type == EventTypes.Create and context.current_state: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) - - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, context, self - ) - - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id - ) - - destinations = set() - for k, s in context.current_state.items(): - try: - if k[0] == EventTypes.Member: - if s.content["membership"] == Membership.JOIN: - destinations.add( - UserID.from_string(s.state_key).domain - ) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) - - with PreserveLoggingContext(): - # Don't block waiting on waking up all the listeners. - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) - - federation_handler.handle_new_event( - event, destinations=destinations, - ) - - @defer.inlineCallbacks def maybe_kick_guest_users(self, event, current_state): # Technically this function invalidates current_state by changing it. # Hopefully this isn't that important to the caller. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 61fe56032a..68d0d78fc6 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -521,11 +521,11 @@ class AuthHandler(BaseHandler): )) return m.serialize() - def generate_short_term_login_token(self, user_id): + def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") now = self.hs.get_clock().time_msec() - expiry = now + (2 * 60 * 1000) + expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) return macaroon.serialize() @@ -615,4 +615,7 @@ class AuthHandler(BaseHandler): Returns: Whether self.hash(password) == stored_hash (bool). """ - return bcrypt.hashpw(password, stored_hash) == stored_hash + if stored_hash: + return bcrypt.hashpw(password, stored_hash) == stored_hash + else: + return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d95e0b23b1..c21d9d4d83 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -33,7 +33,7 @@ from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) -from synapse.types import UserID +from synapse.types import UserID, get_domian_from_id from synapse.events.utils import prune_event @@ -453,7 +453,7 @@ class FederationHandler(BaseHandler): joined_domains = {} for u, d in joined_users: try: - dom = UserID.from_string(u).domain + dom = get_domian_from_id(u) old_d = joined_domains.get(dom) if old_d: joined_domains[dom] = min(d, old_d) @@ -682,7 +682,8 @@ class FederationHandler(BaseHandler): }) try: - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) except AuthError as e: @@ -743,9 +744,7 @@ class FederationHandler(BaseHandler): try: if k[0] == EventTypes.Member: if s.content["membership"] == Membership.JOIN: - destinations.add( - UserID.from_string(s.state_key).domain - ) + destinations.add(get_domian_from_id(s.state_key)) except: logger.warn( "Failed to get destination from event %s", s.event_id @@ -915,7 +914,8 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -970,9 +970,7 @@ class FederationHandler(BaseHandler): try: if k[0] == EventTypes.Member: if s.content["membership"] == Membership.LEAVE: - destinations.add( - UserID.from_string(s.state_key).domain - ) + destinations.add(get_domian_from_id(s.state_key)) except: logger.warn( "Failed to get destination from event %s", s.event_id @@ -1115,7 +1113,7 @@ class FederationHandler(BaseHandler): if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, context, self + event, context ) event_stream_id, max_stream_id = yield self.store.persist_event( @@ -1692,7 +1690,10 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( + builder=builder + ) event, context = yield self.add_display_name_to_third_party_invite( event_dict, event, context @@ -1720,7 +1721,8 @@ class FederationHandler(BaseHandler): def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): builder = self.event_builder_factory.new(event_dict) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -1759,7 +1761,8 @@ class FederationHandler(BaseHandler): event_dict["content"]["third_party_invite"]["display_name"] = display_name builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event(builder=builder) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f51feda2f4..13154edb78 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -17,13 +17,19 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.streams.config import PaginationConfig +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.push.action_generator import ActionGenerator +from synapse.streams.config import PaginationConfig +from synapse.types import ( + UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id +) from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.types import UserID, RoomStreamToken, StreamToken +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -123,7 +129,8 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self._filter_events_for_client( + events = yield filter_events_for_client( + self.store, user_id, events, is_peeking=(member_event_id is None), @@ -483,8 +490,8 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) - messages = yield self._filter_events_for_client( - user_id, messages + messages = yield filter_events_for_client( + self.store, user_id, messages ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -619,8 +626,8 @@ class MessageHandler(BaseHandler): end_token=stream_token ) - messages = yield self._filter_events_for_client( - user_id, messages, is_peeking=is_peeking + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking ) start_token = StreamToken.START.copy_and_replace("room_key", token[0]) @@ -700,8 +707,8 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) - messages = yield self._filter_events_for_client( - user_id, messages, is_peeking=is_peeking, + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking, ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -724,3 +731,193 @@ class MessageHandler(BaseHandler): ret["membership"] = membership defer.returnValue(ret) + + @defer.inlineCallbacks + def _create_new_client_event(self, builder, prev_event_ids=None): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 + else: + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) + + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] + + builder.prev_events = prev_events + builder.depth = depth + + state_handler = self.state_handler + + context = yield state_handler.compute_event_context(builder) + + if builder.is_state(): + builder.prev_state = yield self.store.add_event_hashes( + context.prev_state_events + ) + + yield self.auth.add_auth_events(builder, context) + + signing_key = self.hs.config.signing_key[0] + add_hashes_and_signatures( + builder, self.server_name, signing_key + ) + + event = builder.build() + + logger.debug( + "Created event %s with current state: %s", + event.event_id, context.current_state, + ) + + defer.returnValue( + (event, context,) + ) + + @defer.inlineCallbacks + def handle_new_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[] + ): + # We now need to go and hit out to wherever we need to hit out to. + + if ratelimit: + self.ratelimit(requester) + + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as err: + logger.warn("Denying new event %r because %s", event, err) + raise err + + yield self.maybe_kick_guest_users(event, context.current_state.values()) + + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + + federation_handler = self.hs.get_handlers().federation_handler + + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.INVITE: + def is_inviter_member_event(e): + return ( + e.type == EventTypes.Member and + e.sender == event.sender + ) + + event.unsigned["invite_room_state"] = [ + { + "type": e.type, + "state_key": e.state_key, + "content": e.content, + "sender": e.sender, + } + for k, e in context.current_state.items() + if e.type in self.hs.config.room_invite_state_types + or is_inviter_member_event(e) + ] + + invitee = UserID.from_string(event.state_key) + if not self.hs.is_mine(invitee): + # TODO: Can we add signature from remote server in a nicer + # way? If we have been invited by a remote server, we need + # to get them to sign the event. + + returned_invite = yield federation_handler.send_invite( + invitee.domain, + event, + ) + + event.unsigned.pop("room_state", None) + + # TODO: Make sure the signatures actually are correct. + event.signatures.update( + returned_invite.signatures + ) + + if event.type == EventTypes.Redaction: + if self.auth.check_redaction(event, auth_events=context.current_state): + original_event = yield self.store.get_event( + event.redacts, + check_redacted=False, + get_prev_content=False, + allow_rejected=False, + allow_none=False + ) + if event.user_id != original_event.user_id: + raise AuthError( + 403, + "You don't have permission to redact events" + ) + + if event.type == EventTypes.Create and context.current_state: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) + + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + + destinations = set() + for k, s in context.current_state.items(): + try: + if k[0] == EventTypes.Member: + if s.content["membership"] == Membership.JOIN: + destinations.add(get_domian_from_id(s.state_key)) + except SynapseError: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + # If invite, remove room_state from unsigned before sending. + event.unsigned.pop("invite_room_state", None) + + federation_handler.handle_new_event( + event, destinations=destinations, + ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328b..a8529cce42 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,7 +33,7 @@ from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer -from synapse.types import UserID +from synapse.types import UserID, get_domian_from_id import synapse.metrics from ._base import BaseHandler @@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30 * 1000, self.clock.looping_call, self._handle_timeouts, 5000, @@ -440,7 +440,7 @@ class PresenceHandler(BaseHandler): if not local_states: continue - host = UserID.from_string(user_id).domain + host = get_domian_from_id(user_id) hosts_to_states.setdefault(host, []).extend(local_states) # TODO: de-dup hosts_to_states, as a single host might have multiple diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index b0862067e1..5883b9111e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -358,6 +358,59 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue(data) + @defer.inlineCallbacks + def get_or_create_user(self, localpart, displayname, duration_seconds): + """Creates a new user or returns an access token for an existing one + + Args: + localpart : The local part of the user ID to register. If None, + one will be randomly generated. + Returns: + A tuple of (user_id, access_token). + Raises: + RegistrationError if there was a problem registering. + """ + yield run_on_reactor() + + if localpart is None: + raise SynapseError(400, "Request must include user id") + + need_register = True + + try: + yield self.check_username(localpart) + except SynapseError as e: + if e.errcode == Codes.USER_IN_USE: + need_register = False + else: + raise + + user = UserID(localpart, self.hs.hostname) + user_id = user.to_string() + auth_handler = self.hs.get_handlers().auth_handler + token = auth_handler.generate_short_term_login_token(user_id, duration_seconds) + + if need_register: + yield self.store.register( + user_id=user_id, + token=token, + password_hash=None + ) + + yield registered_user(self.distributor, user) + else: + yield self.store.flush_user(user_id=user_id) + yield self.store.add_access_token_to_user(user_id=user_id, token=token) + + if displayname is not None: + logger.info("setting user display name: %s -> %s", user_id, displayname) + profile_handler = self.hs.get_handlers().profile_handler + yield profile_handler.set_displayname( + user, user, displayname + ) + + defer.returnValue((user_id, token)) + def auth_handler(self): return self.hs.get_handlers().auth_handler diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dd9c18df84..3d63b3c513 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.visibility import filter_events_for_client from collections import OrderedDict @@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() def filter_evts(events): - return self._filter_events_for_client( + return filter_events_for_client( + self.store, user.to_string(), events, - is_peeking=is_guest) + is_peeking=is_guest + ) event = yield self.store.get_event(event_id, get_prev_content=True, allow_none=True) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed2cda837f..b44e52a515 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -113,7 +113,7 @@ class RoomMemberHandler(BaseHandler): prev_event_ids=prev_event_ids, ) - yield self.handle_new_client_event( + yield msg_handler.handle_new_client_event( requester, event, context, @@ -357,7 +357,7 @@ class RoomMemberHandler(BaseHandler): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - yield self.handle_new_client_event( + yield message_handler.handle_new_client_event( requester, event, context, diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 9937d8dd7f..df75d70fac 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from synapse.visibility import filter_events_for_client from unpaddedbase64 import decode_base64, encode_base64 @@ -172,8 +173,8 @@ class SearchHandler(BaseHandler): filtered_events = search_filter.filter([r["event"] for r in results]) - events = yield self._filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) events.sort(key=lambda e: -rank_map[e.event_id]) @@ -223,8 +224,8 @@ class SearchHandler(BaseHandler): r["event"] for r in results ]) - events = yield self._filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) room_events.extend(events) @@ -281,12 +282,12 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) - res["events_before"] = yield self._filter_events_for_client( - user.to_string(), res["events_before"] + res["events_before"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_before"] ) - res["events_after"] = yield self._filter_events_for_client( - user.to_string(), res["events_after"] + res["events_after"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_after"] ) res["start"] = now_token.copy_and_replace( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 231140b655..921215469f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user +from synapse.visibility import filter_events_for_client from twisted.internet import defer @@ -247,6 +248,10 @@ class SyncHandler(BaseHandler): sync_config.user.to_string() ) + ignored_users = account_data.get( + "m.ignored_user_list", {} + ).get("ignored_users", {}).keys() + joined = [] invited = [] archived = [] @@ -267,6 +272,8 @@ class SyncHandler(BaseHandler): ) joined.append(room_result) elif event.membership == Membership.INVITE: + if event.sender in ignored_users: + return invite = yield self.store.get_event(event.event_id) invited.append(InvitedSyncResult( room_id=event.room_id, @@ -515,6 +522,15 @@ class SyncHandler(BaseHandler): sync_config.user ) + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id=user_id, + ) + + if ignored_account_data: + ignored_users = ignored_account_data.get("ignored_users", {}).keys() + else: + ignored_users = frozenset() + # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key @@ -549,9 +565,10 @@ class SyncHandler(BaseHandler): # Only bother if we're still currently invited should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) + if event.sender not in ignored_users: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) # Always include leave/ban events. Just take the last one. # TODO: How do we handle ban -> leave in same batch? @@ -681,7 +698,8 @@ class SyncHandler(BaseHandler): if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) - recents = yield self._filter_events_for_client( + recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), recents, ) @@ -702,7 +720,8 @@ class SyncHandler(BaseHandler): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) - loaded_recents = yield self._filter_events_for_client( + loaded_recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), loaded_recents, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 6af7a8f424..33b79c0ec7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,6 +21,7 @@ from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext from synapse.types import StreamToken +from synapse.visibility import filter_events_for_client import synapse.metrics from collections import namedtuple @@ -398,8 +399,8 @@ class Notifier(object): ) if name == "room": - room_member_handler = self.hs.get_handlers().room_member_handler - new_events = yield room_member_handler._filter_events_for_client( + new_events = yield filter_events_for_client( + self.store, user.to_string(), new_events, is_peeking=is_peeking, diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a0160994b7..9b208668b6 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -37,14 +37,14 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, context, handler): + def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "handle_push_actions_for_event"): bulk_evaluator = yield evaluator_for_event( event, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, handler, context.current_state + event, context.current_state ) context.push_actions = [ diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f97df36d80..25e13b3423 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -22,6 +22,7 @@ from .baserules import list_with_base_rules from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_clients logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator: self.store = store @defer.inlineCallbacks - def action_for_event_by_user(self, event, handler, current_state): + def action_for_event_by_user(self, event, current_state): actions_by_user = {} # None of these users can be peeking since this list of users comes @@ -136,8 +137,8 @@ class BulkPushRuleEvaluator: (u, False) for u in self.rules_by_user.keys() ] - filtered_by_user = yield handler.filter_events_for_clients( - user_tuples, [event], {event.event_id: current_state} + filtered_by_user = yield filter_events_for_clients( + self.store, user_tuples, [event], {event.event_id: current_state} ) room_members = yield self.store.get_users_in_room(self.room_id) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py new file mode 100644 index 0000000000..3a13c7485a --- /dev/null +++ b/synapse/push/emailpusher.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +import logging + +from synapse.util.metrics import Measure +from synapse.util.logcontext import LoggingContext + +from mailer import Mailer + +logger = logging.getLogger(__name__) + +# The amount of time we always wait before ever emailing about a notification +# (to give the user a chance to respond to other push or notice the window) +DELAY_BEFORE_MAIL_MS = 2 * 60 * 1000 + +THROTTLE_START_MS = 2 * 60 * 1000 +THROTTLE_MAX_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days +THROTTLE_MULTIPLIER = 2 + +# If no event triggers a notification for this long after the previous, +# the throttle is released. +THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days + + +class EmailPusher(object): + """ + A pusher that sends email notifications about events (approximately) + when they happen. + This shares quite a bit of code with httpusher: it would be good to + factor out the common parts + """ + def __init__(self, hs, pusherdict): + self.hs = hs + self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() + self.pusher_id = pusherdict['id'] + self.user_id = pusherdict['user_name'] + self.app_id = pusherdict['app_id'] + self.email = pusherdict['pushkey'] + self.last_stream_ordering = pusherdict['last_stream_ordering'] + self.timed_call = None + self.throttle_params = None + + # See httppusher + self.max_stream_ordering = None + + self.processing = False + + if self.hs.config.email_enable_notifs: + self.mailer = Mailer(self.hs) + else: + self.mailer = None + + @defer.inlineCallbacks + def on_started(self): + if self.mailer is not None: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + + def on_stop(self): + if self.timed_call: + self.timed_call.cancel() + + @defer.inlineCallbacks + def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + yield self._process() + + def on_new_receipts(self, min_stream_id, max_stream_id): + # We could wake up and cancel the timer but there tend to be quite a + # lot of read receipts so it's probably less work to just let the + # timer fire + return defer.succeed(None) + + @defer.inlineCallbacks + def on_timer(self): + self.timed_call = None + yield self._process() + + @defer.inlineCallbacks + def _process(self): + if self.processing: + return + + with LoggingContext("emailpush._process"): + with Measure(self.clock, "emailpush._process"): + try: + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering + try: + yield self._unsafe_process() + except: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + """ + Main logic of the push loop without the wrapper function that sets + up logging, measures and guards against multiple instances of it + being run. + """ + unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + self.user_id, self.last_stream_ordering, self.max_stream_ordering + ) + + soonest_due_at = None + + for push_action in unprocessed: + received_at = push_action['received_ts'] + if received_at is None: + received_at = 0 + notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS + + room_ready_at = self.room_ready_to_notify_at( + push_action['room_id'] + ) + + should_notify_at = max(notif_ready_at, room_ready_at) + + if should_notify_at < self.clock.time_msec(): + # one of our notifications is ready for sending, so we send + # *one* email updating the user on their notifications, + # we then consider all previously outstanding notifications + # to be delivered. + yield self.send_notification(unprocessed) + + yield self.save_last_stream_ordering_and_success(max([ + ea['stream_ordering'] for ea in unprocessed + ])) + yield self.sent_notif_update_throttle( + push_action['room_id'], push_action + ) + break + else: + if soonest_due_at is None or should_notify_at < soonest_due_at: + soonest_due_at = should_notify_at + + if self.timed_call is not None: + self.timed_call.cancel() + self.timed_call = None + + if soonest_due_at is not None: + self.timed_call = reactor.callLater( + self.seconds_until(soonest_due_at), self.on_timer + ) + + @defer.inlineCallbacks + def save_last_stream_ordering_and_success(self, last_stream_ordering): + self.last_stream_ordering = last_stream_ordering + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, self.email, self.user_id, + last_stream_ordering, self.clock.time_msec() + ) + + def seconds_until(self, ts_msec): + return (ts_msec - self.clock.time_msec()) / 1000 + + def get_room_throttle_ms(self, room_id): + if room_id in self.throttle_params: + return self.throttle_params[room_id]["throttle_ms"] + else: + return 0 + + def get_room_last_sent_ts(self, room_id): + if room_id in self.throttle_params: + return self.throttle_params[room_id]["last_sent_ts"] + else: + return 0 + + def room_ready_to_notify_at(self, room_id): + """ + Determines whether throttling should prevent us from sending an email + for the given room + Returns: True if we should send, False if we should not + """ + last_sent_ts = self.get_room_last_sent_ts(room_id) + throttle_ms = self.get_room_throttle_ms(room_id) + + may_send_at = last_sent_ts + throttle_ms + return may_send_at + + @defer.inlineCallbacks + def sent_notif_update_throttle(self, room_id, notified_push_action): + # We have sent a notification, so update the throttle accordingly. + # If the event that triggered the notif happened more than + # THROTTLE_RESET_AFTER_MS after the previous one that triggered a + # notif, we release the throttle. Otherwise, the throttle is increased. + time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before( + notified_push_action['stream_ordering'] + ) + + time_of_this_notifs = notified_push_action['received_ts'] + + if time_of_previous_notifs is not None and time_of_this_notifs is not None: + gap = time_of_this_notifs - time_of_previous_notifs + else: + # if we don't know the arrival time of one of the notifs (it was not + # stored prior to email notification code) then assume a gap of + # zero which will just not reset the throttle + gap = 0 + + current_throttle_ms = self.get_room_throttle_ms(room_id) + + if gap > THROTTLE_RESET_AFTER_MS: + new_throttle_ms = THROTTLE_START_MS + else: + if current_throttle_ms == 0: + new_throttle_ms = THROTTLE_START_MS + else: + new_throttle_ms = min( + current_throttle_ms * THROTTLE_MULTIPLIER, + THROTTLE_MAX_MS + ) + self.throttle_params[room_id] = { + "last_sent_ts": self.clock.time_msec(), + "throttle_ms": new_throttle_ms + } + yield self.store.set_throttle_params( + self.pusher_id, room_id, self.throttle_params[room_id] + ) + + @defer.inlineCallbacks + def send_notification(self, push_actions): + logger.info("Sending notif email for user %r", self.user_id) + yield self.mailer.send_notification_mail( + self.user_id, self.email, push_actions + ) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py new file mode 100644 index 0000000000..2fd38a036a --- /dev/null +++ b/synapse/push/mailer.py @@ -0,0 +1,456 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from twisted.mail.smtp import sendmail + +import email.utils +import email.mime.multipart +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart + +from synapse.util.async import concurrently_execute +from synapse.util.presentable_names import ( + calculate_room_name, name_from_member_event, descriptor_from_member_events +) +from synapse.types import UserID +from synapse.api.errors import StoreError +from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_client + +import jinja2 +import bleach + +import time +import urllib + +import logging +logger = logging.getLogger(__name__) + + +MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \ + "in the %s room..." +MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." +MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." +MESSAGES_IN_ROOM = "There are some messages on %(app)s for you in the %(room)s room..." +MESSAGES_IN_ROOMS = "Here are some messages on %(app)s you may have missed..." +INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \ + "%(room)s room on %(app)s..." +INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." + +CONTEXT_BEFORE = 1 +CONTEXT_AFTER = 1 + +# From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js +ALLOWED_TAGS = [ + 'font', # custom to matrix for IRC-style font coloring + 'del', # for markdown + # deliberately no h1/h2 to stop people shouting. + 'h3', 'h4', 'h5', 'h6', 'blockquote', 'p', 'a', 'ul', 'ol', + 'nl', 'li', 'b', 'i', 'u', 'strong', 'em', 'strike', 'code', 'hr', 'br', 'div', + 'table', 'thead', 'caption', 'tbody', 'tr', 'th', 'td', 'pre' +] +ALLOWED_ATTRS = { + # custom ones first: + "font": ["color"], # custom to matrix + "a": ["href", "name", "target"], # remote target: custom to matrix + # We don't currently allow img itself by default, but this + # would make sense if we did + "img": ["src"], +} +# When bleach release a version with this option, we can specify schemes +# ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"] + + +class Mailer(object): + def __init__(self, hs): + self.hs = hs + self.store = self.hs.get_datastore() + self.state_handler = self.hs.get_state_handler() + loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) + self.app_name = self.hs.config.email_app_name + env = jinja2.Environment(loader=loader) + env.filters["format_ts"] = format_ts_filter + env.filters["mxc_to_http"] = self.mxc_to_http_filter + self.notif_template_html = env.get_template( + self.hs.config.email_notif_template_html + ) + self.notif_template_text = env.get_template( + self.hs.config.email_notif_template_text + ) + + @defer.inlineCallbacks + def send_notification_mail(self, user_id, email_address, push_actions): + raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1] + raw_to = email.utils.parseaddr(email_address)[1] + + if raw_to == '': + raise RuntimeError("Invalid 'to' address") + + rooms_in_order = deduped_ordered_list( + [pa['room_id'] for pa in push_actions] + ) + + notif_events = yield self.store.get_events( + [pa['event_id'] for pa in push_actions] + ) + + notifs_by_room = {} + for pa in push_actions: + notifs_by_room.setdefault(pa["room_id"], []).append(pa) + + # collect the current state for all the rooms in which we have + # notifications + state_by_room = {} + + try: + user_display_name = yield self.store.get_profile_displayname( + UserID.from_string(user_id).localpart + ) + except StoreError: + user_display_name = user_id + + @defer.inlineCallbacks + def _fetch_room_state(room_id): + room_state = yield self.state_handler.get_current_state(room_id) + state_by_room[room_id] = room_state + + # Run at most 3 of these at once: sync does 10 at a time but email + # notifs are much realtime than sync so we can afford to wait a bit. + yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) + + rooms = [] + + for r in rooms_in_order: + roomvars = yield self.get_room_vars( + r, user_id, notifs_by_room[r], notif_events, state_by_room[r] + ) + rooms.append(roomvars) + + summary_text = self.make_summary_text( + notifs_by_room, state_by_room, notif_events, user_id + ) + + template_vars = { + "user_display_name": user_display_name, + "unsubscribe_link": self.make_unsubscribe_link(), + "summary_text": summary_text, + "app_name": self.app_name, + "rooms": rooms, + } + + html_text = self.notif_template_html.render(**template_vars) + html_part = MIMEText(html_text, "html", "utf8") + + plain_text = self.notif_template_text.render(**template_vars) + text_part = MIMEText(plain_text, "plain", "utf8") + + multipart_msg = MIMEMultipart('alternative') + multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text) + multipart_msg['From'] = self.hs.config.email_notif_from + multipart_msg['To'] = email_address + multipart_msg['Date'] = email.utils.formatdate() + multipart_msg['Message-ID'] = email.utils.make_msgid() + multipart_msg.attach(text_part) + multipart_msg.attach(html_part) + + logger.info("Sending email push notification to %s" % email_address) + # logger.debug(html_text) + + yield sendmail( + self.hs.config.email_smtp_host, + raw_from, raw_to, multipart_msg.as_string(), + port=self.hs.config.email_smtp_port + ) + + @defer.inlineCallbacks + def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state): + my_member_event = room_state[("m.room.member", user_id)] + is_invite = my_member_event.content["membership"] == "invite" + + room_vars = { + "title": calculate_room_name(room_state, user_id), + "hash": string_ordinal_total(room_id), # See sender avatar hash + "notifs": [], + "invite": is_invite, + "link": self.make_room_link(room_id), + } + + if not is_invite: + for n in notifs: + notifvars = yield self.get_notif_vars( + n, user_id, notif_events[n['event_id']], room_state + ) + + # merge overlapping notifs together. + # relies on the notifs being in chronological order. + merge = False + if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]: + prev_messages = room_vars['notifs'][-1]['messages'] + for message in notifvars['messages']: + pm = filter(lambda pm: pm['id'] == message['id'], prev_messages) + if pm: + if not message["is_historical"]: + pm[0]["is_historical"] = False + merge = True + elif merge: + # we're merging, so append any remaining messages + # in this notif to the previous one + prev_messages.append(message) + + if not merge: + room_vars['notifs'].append(notifvars) + + defer.returnValue(room_vars) + + @defer.inlineCallbacks + def get_notif_vars(self, notif, user_id, notif_event, room_state): + results = yield self.store.get_events_around( + notif['room_id'], notif['event_id'], + before_limit=CONTEXT_BEFORE, after_limit=CONTEXT_AFTER + ) + + ret = { + "link": self.make_notif_link(notif), + "ts": notif['received_ts'], + "messages": [], + } + + the_events = yield filter_events_for_client( + self.store, user_id, results["events_before"] + ) + the_events.append(notif_event) + + for event in the_events: + messagevars = self.get_message_vars(notif, event, room_state) + if messagevars is not None: + ret['messages'].append(messagevars) + + defer.returnValue(ret) + + def get_message_vars(self, notif, event, room_state): + if event.type != EventTypes.Message: + return None + + sender_state_event = room_state[("m.room.member", event.sender)] + sender_name = name_from_member_event(sender_state_event) + sender_avatar_url = sender_state_event.content["avatar_url"] + + # 'hash' for deterministically picking default images: use + # sender_hash % the number of default images to choose from + sender_hash = string_ordinal_total(event.sender) + + ret = { + "msgtype": event.content["msgtype"], + "is_historical": event.event_id != notif['event_id'], + "id": event.event_id, + "ts": event.origin_server_ts, + "sender_name": sender_name, + "sender_avatar_url": sender_avatar_url, + "sender_hash": sender_hash, + } + + if event.content["msgtype"] == "m.text": + self.add_text_message_vars(ret, event) + elif event.content["msgtype"] == "m.image": + self.add_image_message_vars(ret, event) + + if "body" in event.content: + ret["body_text_plain"] = event.content["body"] + + return ret + + def add_text_message_vars(self, messagevars, event): + if "format" in event.content: + msgformat = event.content["format"] + else: + msgformat = None + messagevars["format"] = msgformat + + if msgformat == "org.matrix.custom.html": + messagevars["body_text_html"] = safe_markup(event.content["formatted_body"]) + else: + messagevars["body_text_html"] = safe_text(event.content["body"]) + + return messagevars + + def add_image_message_vars(self, messagevars, event): + messagevars["image_url"] = event.content["url"] + + return messagevars + + def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id): + if len(notifs_by_room) == 1: + # Only one room has new stuff + room_id = notifs_by_room.keys()[0] + + # If the room has some kind of name, use it, but we don't + # want the generated-from-names one here otherwise we'll + # end up with, "new message from Bob in the Bob room" + room_name = calculate_room_name( + state_by_room[room_id], user_id, fallback_to_members=False + ) + + my_member_event = state_by_room[room_id][("m.room.member", user_id)] + if my_member_event.content["membership"] == "invite": + inviter_member_event = state_by_room[room_id][ + ("m.room.member", my_member_event.sender) + ] + inviter_name = name_from_member_event(inviter_member_event) + + if room_name is None: + return INVITE_FROM_PERSON % { + "person": inviter_name, + "app": self.app_name + } + else: + return INVITE_FROM_PERSON_TO_ROOM % { + "person": inviter_name, + "room": room_name, + "app": self.app_name, + } + + sender_name = None + if len(notifs_by_room[room_id]) == 1: + # There is just the one notification, so give some detail + event = notif_events[notifs_by_room[room_id][0]["event_id"]] + if ("m.room.member", event.sender) in state_by_room[room_id]: + state_event = state_by_room[room_id][("m.room.member", event.sender)] + sender_name = name_from_member_event(state_event) + + if sender_name is not None and room_name is not None: + return MESSAGE_FROM_PERSON_IN_ROOM % { + "person": sender_name, + "room": room_name, + "app": self.app_name, + } + elif sender_name is not None: + return MESSAGE_FROM_PERSON % { + "person": sender_name, + "app": self.app_name, + } + else: + # There's more than one notification for this room, so just + # say there are several + if room_name is not None: + return MESSAGES_IN_ROOM % { + "room": room_name, + "app": self.app_name, + } + else: + # If the room doesn't have a name, say who the messages + # are from explicitly to avoid, "messages in the Bob room" + sender_ids = list(set([ + notif_events[n['event_id']].sender + for n in notifs_by_room[room_id] + ])) + + return MESSAGES_FROM_PERSON % { + "person": descriptor_from_member_events([ + state_by_room[room_id][("m.room.member", s)] + for s in sender_ids + ]), + "app": self.app_name, + } + else: + # Stuff's happened in multiple different rooms + return MESSAGES_IN_ROOMS % { + "app": self.app_name, + } + + def make_room_link(self, room_id): + # need /beta for Universal Links to work on iOS + if self.app_name == "Vector": + return "https://vector.im/beta/#/room/%s" % (room_id,) + else: + return "https://matrix.to/#/%s" % (room_id,) + + def make_notif_link(self, notif): + # need /beta for Universal Links to work on iOS + if self.app_name == "Vector": + return "https://vector.im/beta/#/room/%s/%s" % ( + notif['room_id'], notif['event_id'] + ) + else: + return "https://matrix.to/#/%s/%s" % ( + notif['room_id'], notif['event_id'] + ) + + def make_unsubscribe_link(self): + # XXX: matrix.to + return "https://vector.im/#/settings" + + def mxc_to_http_filter(self, value, width, height, resize_method="crop"): + if value[0:6] != "mxc://": + return "" + + serverAndMediaId = value[6:] + fragment = None + if '#' in serverAndMediaId: + (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) + fragment = "#" + fragment + + params = { + "width": width, + "height": height, + "method": resize_method, + } + return "%s_matrix/media/v1/thumbnail/%s?%s%s" % ( + self.hs.config.public_baseurl, + serverAndMediaId, + urllib.urlencode(params), + fragment or "", + ) + + +def safe_markup(raw_html): + return jinja2.Markup(bleach.linkify(bleach.clean( + raw_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS, + # bleach master has this, but it isn't released yet + # protocols=ALLOWED_SCHEMES, + strip=True + ))) + + +def safe_text(raw_text): + """ + Process text: treat it as HTML but escape any tags (ie. just escape the + HTML) then linkify it. + """ + return jinja2.Markup(bleach.linkify(bleach.clean( + raw_text, tags=[], attributes={}, + strip=False + ))) + + +def deduped_ordered_list(l): + seen = set() + ret = [] + for item in l: + if item not in seen: + seen.add(item) + ret.append(item) + return ret + + +def string_ordinal_total(s): + tot = 0 + for c in s: + tot += ord(c) + return tot + + +def format_ts_filter(value, format): + return time.strftime(format, time.localtime(value / 1000)) diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index 4960837504..e6c0806415 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -1,10 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from httppusher import HttpPusher -PUSHER_TYPES = { - 'http': HttpPusher -} +import logging +logger = logging.getLogger(__name__) def create_pusher(hs, pusherdict): + logger.info("trying to create_pusher for %r", pusherdict) + + PUSHER_TYPES = { + "http": HttpPusher, + } + + logger.info("email enable notifs: %r", hs.config.email_enable_notifs) + if hs.config.email_enable_notifs: + from synapse.push.emailpusher import EmailPusher + PUSHER_TYPES["email"] = EmailPusher + logger.info("defined email pusher type") + if pusherdict['kind'] in PUSHER_TYPES: + logger.info("found pusher") return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 6ef48d63f7..5853ec36a9 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,6 @@ from twisted.internet import defer import pusher -from synapse.push import PusherConfigException from synapse.util.logcontext import preserve_fn from synapse.util.async import run_on_reactor @@ -50,6 +49,7 @@ class PusherPool: # recreated, added and started: this means we have only one # code path adding pushers. pusher.create_pusher(self.hs, { + "id": None, "user_name": user_id, "kind": kind, "app_id": app_id, @@ -185,8 +185,8 @@ class PusherPool: for pusherdict in pushers: try: p = pusher.create_pusher(self.hs, pusherdict) - except PusherConfigException: - logger.exception("Couldn't start a pusher: caught PusherConfigException") + except: + logger.exception("Couldn't start a pusher: caught Exception") continue if p: appid_pushkey = "%s:%s" % ( diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 0eb3d6c1de..e0a7a19777 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -44,6 +44,10 @@ CONDITIONAL_REQUIREMENTS = { "preview_url": { "netaddr>=0.7.18": ["netaddr"], }, + "email.enable_notifs": { + "Jinja2>=2.8": ["Jinja2>=2.8"], + "bleach>=1.4.2": ["bleach>=1.4.2"], + }, } diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index ff78c60f13..0e983ae7fa 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -159,6 +159,15 @@ class ReplicationResource(Resource): result = yield self.notifier.wait_for_replication(replicate, timeout) + for stream_name, stream_content in result.items(): + logger.info( + "Replicating %d rows of %s from %s -> %s", + len(stream_content["rows"]), + stream_name, + request_streams.get(stream_name), + stream_content["position"], + ) + request.write(json.dumps(result, ensure_ascii=False)) finish_request(request) diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py new file mode 100644 index 0000000000..f59b0eabbc --- /dev/null +++ b/synapse/replication/slave/storage/account_data.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage.account_data import AccountDataStore + + +class SlavedAccountDataStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedAccountDataStore, self).__init__(db_conn, hs) + self._account_data_id_gen = SlavedIdTracker( + db_conn, "account_data_max_stream_id", "stream_id", + ) + + get_global_account_data_by_type_for_users = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] + ) + + get_global_account_data_by_type_for_user = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] + ) + + def stream_positions(self): + result = super(SlavedAccountDataStore, self).stream_positions() + position = self._account_data_id_gen.get_current_token() + result["user_account_data"] = position + result["room_account_data"] = position + result["tag_account_data"] = position + return result + + def process_replication(self, result): + stream = result.get("user_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + user_id, data_type = row[1:3] + self.get_global_account_data_by_type_for_user.invalidate( + (data_type, user_id,) + ) + + stream = result.get("room_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + + stream = result.get("tag_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 86f00b6ff5..c0d741452d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_state_group_for_events = ( + StateStore.__dict__["_get_state_group_for_events"] + ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -83,6 +95,7 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_push_action_users_in_range.__func__ ) get_event = DataStore.get_event.__func__ + get_events = DataStore.get_events.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ get_rooms_for_user_where_membership_is = ( @@ -95,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + get_events_around = DataStore.get_events_around.__func__ + get_state_for_events = DataStore.get_state_for_events.__func__ + get_state_groups = DataStore.get_state_groups.__func__ _set_before_and_after = DataStore._set_before_and_after @@ -104,6 +120,7 @@ class SlavedEventStore(BaseSlavedStore): _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ _parse_events_txn = DataStore._parse_events_txn.__func__ _get_events_txn = DataStore._get_events_txn.__func__ + _get_event_txn = DataStore._get_event_txn.__func__ _enqueue_events = DataStore._enqueue_events.__func__ _do_fetch = DataStore._do_fetch.__func__ _fetch_events_txn = DataStore._fetch_events_txn.__func__ @@ -114,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore): DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ + _get_state_for_groups = DataStore._get_state_for_groups.__func__ + _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ + _get_events_around_txn = DataStore._get_events_around_txn.__func__ + _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() @@ -128,7 +149,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("events") if stream: - self._stream_id_gen.advance(stream["position"]) + self._stream_id_gen.advance(int(stream["position"])) for row in stream["rows"]: self._process_replication_row( row, backfilled=False, state_resets=state_resets @@ -136,7 +157,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("backfill") if stream: - self._backfill_id_gen.advance(-stream["position"]) + self._backfill_id_gen.advance(-int(stream["position"])) for row in stream["rows"]: self._process_replication_row( row, backfilled=True, state_resets=state_resets @@ -144,12 +165,14 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("forward_ex_outliers") if stream: + self._stream_id_gen.advance(int(stream["position"])) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) stream = result.get("backward_ex_outliers") if stream: + self._backfill_id_gen.advance(-int(stream["position"])) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index 8faddb2595..d88206b3bb 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -43,10 +43,10 @@ class SlavedPusherStore(BaseSlavedStore): def process_replication(self, result): stream = result.get("pushers") if stream: - self._pushers_id_gen.advance(stream["position"]) + self._pushers_id_gen.advance(int(stream["position"])) stream = result.get("deleted_pushers") if stream: - self._pushers_id_gen.advance(stream["position"]) + self._pushers_id_gen.advance(int(stream["position"])) return super(SlavedPusherStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index b55d5dfd08..ec007516d0 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -50,7 +50,7 @@ class SlavedReceiptsStore(BaseSlavedStore): def process_replication(self, result): stream = result.get("receipts") if stream: - self._receipts_id_gen.advance(stream["position"]) + self._receipts_id_gen.advance(int(stream["position"])) for row in stream["rows"]: room_id, receipt_type, user_id = row[1:4] self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 6688fa8fa0..8b223e032b 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -44,6 +44,8 @@ from synapse.rest.client.v2_alpha import ( tokenrefresh, tags, account_data, + report_event, + openid, ) from synapse.http.server import JsonResource @@ -86,3 +88,5 @@ class ClientRestResource(JsonResource): tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) + report_event.register_servlets(hs, client_resource) + openid.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index c6a2ef2ccc..e3f4fbb0bb 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -355,5 +355,76 @@ class RegisterRestServlet(ClientV1RestServlet): ) +class CreateUserRestServlet(ClientV1RestServlet): + """Handles user creation via a server-to-server interface + """ + + PATTERNS = client_path_patterns("/createUser$", releases=()) + + def __init__(self, hs): + super(CreateUserRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.direct_user_creation_max_duration = hs.config.user_creation_max_duration + + @defer.inlineCallbacks + def on_POST(self, request): + user_json = parse_json_object_from_request(request) + + if "access_token" not in request.args: + raise SynapseError(400, "Expected application service token.") + + app_service = yield self.store.get_app_service_by_token( + request.args["access_token"][0] + ) + if not app_service: + raise SynapseError(403, "Invalid application service token.") + + logger.debug("creating user: %s", user_json) + + response = yield self._do_create(user_json) + + defer.returnValue((200, response)) + + def on_OPTIONS(self, request): + return 403, {} + + @defer.inlineCallbacks + def _do_create(self, user_json): + yield run_on_reactor() + + if "localpart" not in user_json: + raise SynapseError(400, "Expected 'localpart' key.") + + if "displayname" not in user_json: + raise SynapseError(400, "Expected 'displayname' key.") + + if "duration_seconds" not in user_json: + raise SynapseError(400, "Expected 'duration_seconds' key.") + + localpart = user_json["localpart"].encode("utf-8") + displayname = user_json["displayname"].encode("utf-8") + duration_seconds = 0 + try: + duration_seconds = int(user_json["duration_seconds"]) + except ValueError: + raise SynapseError(400, "Failed to parse 'duration_seconds'") + if duration_seconds > self.direct_user_creation_max_duration: + duration_seconds = self.direct_user_creation_max_duration + + handler = self.handlers.registration_handler + user_id, token = yield handler.get_or_create_user( + localpart=localpart, + displayname=displayname, + duration_seconds=duration_seconds + ) + + defer.returnValue({ + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname, + }) + + def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) + CreateUserRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py new file mode 100644 index 0000000000..aa1cae8e1e --- /dev/null +++ b/synapse/rest/client/v2_alpha/openid.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError +from synapse.util.stringutils import random_string + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class IdTokenServlet(RestServlet): + """ + Get a bearer token that may be passed to a third party to confirm ownership + of a matrix user id. + + The format of the response could be made compatible with the format given + in http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse + + But instead of returning a signed "id_token" the response contains the + name of the issuing matrix homeserver. This means that for now the third + party will need to check the validity of the "id_token" against the + federation /openid/userinfo endpoint of the homeserver. + + Request: + + POST /user/{user_id}/openid/request_token?access_token=... HTTP/1.1 + + {} + + Response: + + HTTP/1.1 200 OK + { + "access_token": "ABDEFGH", + "token_type": "Bearer", + "matrix_server_name": "example.com", + "expires_in": 3600, + } + """ + PATTERNS = client_v2_patterns( + "/user/(?P<user_id>[^/]*)/openid/request_token" + ) + + EXPIRES_MS = 3600 * 1000 + + def __init__(self, hs): + super(IdTokenServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.server_name = hs.config.server_name + + @defer.inlineCallbacks + def on_POST(self, request, user_id): + requester = yield self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot request tokens for other users.") + + # Parse the request body to make sure it's JSON, but ignore the contents + # for now. + parse_json_object_from_request(request) + + token = random_string(24) + ts_valid_until_ms = self.clock.time_msec() + self.EXPIRES_MS + + yield self.store.insert_open_id_token(token, ts_valid_until_ms, user_id) + + defer.returnValue((200, { + "access_token": token, + "token_type": "Bearer", + "matrix_server_name": self.server_name, + "expires_in": self.EXPIRES_MS / 1000, + })) + + +def register_servlets(hs, http_server): + IdTokenServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index ff8f69ddbf..1ecc02d94d 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -48,6 +48,7 @@ class RegisterRestServlet(RestServlet): super(RegisterRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() + self.store = hs.get_datastore() self.auth_handler = hs.get_handlers().auth_handler self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler @@ -214,6 +215,34 @@ class RegisterRestServlet(RestServlet): threepid['validated_at'], ) + # And we add an email pusher for them by default, but only + # if email notifications are enabled (so people don't start + # getting mail spam where they weren't before if email + # notifs are set up on a home server) + if ( + self.hs.config.email_enable_notifs and + self.hs.config.email_notif_for_new_users + ): + # Pull the ID of the access token back out of the db + # It would really make more sense for this to be passed + # up when the access token is saved, but that's quite an + # invasive change I'd rather do separately. + user_tuple = yield self.store.get_user_by_access_token( + token + ) + + yield self.hs.get_pusherpool().add_pusher( + user_id=user_id, + access_token=user_tuple["token_id"], + kind="email", + app_id="m.email", + app_display_name="Email Notifications", + device_display_name=threepid["address"], + pushkey=threepid["address"], + lang=None, # We don't know a user's language here + data={}, + ) + if 'bind_email' in params and params['bind_email']: logger.info("bind_email specified: binding") diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py new file mode 100644 index 0000000000..8903e12405 --- /dev/null +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from ._base import client_v2_patterns + +import logging + + +logger = logging.getLogger(__name__) + + +class ReportEventRestServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/rooms/(?P<room_id>[^/]*)/report/(?P<event_id>[^/]*)$" + ) + + def __init__(self, hs): + super(ReportEventRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_POST(self, request, room_id, event_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + body = parse_json_object_from_request(request) + + yield self.store.add_event_report( + room_id=room_id, + event_id=event_id, + user_id=user_id, + reason=body.get("reason"), + content=body, + received_ts=self.clock.time_msec(), + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + ReportEventRestServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 045ae6c03f..d970fde9e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): @@ -114,6 +116,7 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") + self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1e27c2c0ce..e0d7098692 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -453,7 +453,9 @@ class SQLBaseStore(object): keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values insertion_values (dict): key/values to use when inserting - Returns: A deferred + Returns: + Deferred(bool): True if a new entry was created, False if an + existing one was updated. """ return self.runInteraction( desc, @@ -498,6 +500,10 @@ class SQLBaseStore(object): ) txn.execute(sql, allvalues.values()) + return True + else: + return False + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 7a7fbf1e52..ec7e8d40d2 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -16,6 +16,8 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks + import ujson as json import logging @@ -24,6 +26,7 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): + @cached() def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. @@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_user", get_account_data_for_user_txn ) + @cachedInlineCallbacks(num_args=2) + def get_global_account_data_by_type_for_user(self, data_type, user_id): + """ + Returns: + Deferred: A dict + """ + result = yield self._simple_select_one_onecol( + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": data_type, + }, + retcol="content", + desc="get_global_account_data_by_type_for_user", + allow_none=True, + ) + + if result: + defer.returnValue(json.loads(result)) + else: + defer.returnValue(None) + + @cachedList(cached_method_name="get_global_account_data_by_type_for_user", + num_args=2, list_name="user_ids", inlineCallbacks=True) + def get_global_account_data_by_type_for_users(self, data_type, user_ids): + rows = yield self._simple_select_many_batch( + table="account_data", + column="user_id", + iterable=user_ids, + keyvalues={ + "account_data_type": data_type, + }, + retcols=("user_id", "content",), + desc="get_global_account_data_by_type_for_users", + ) + + defer.returnValue({ + row["user_id"]: json.loads(row["content"]) if row["content"] else None + for row in rows + }) + def get_account_data_for_room(self, user_id, room_id): """Get all the client account_data for a user for a room. @@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id, ) + txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) self._update_max_stream_id(txn, next_id) with self._account_data_id_gen.get_next() as next_id: @@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id, ) + txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) + txn.call_after( + self.get_global_account_data_by_type_for_user.invalidate, + (account_data_type, user_id,) + ) self._update_max_stream_id(txn, next_id) with self._account_data_id_gen.get_next() as next_id: diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 86a98b6f11..9705db5c47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore): max_stream_ordering=None): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " + "FROM (" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" - ") AS rl " - "WHERE" + ") AS rl," + " event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" " ep.room_id = rl.room_id" " AND (" " ep.topological_ordering > rl.topological_ordering" @@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep " - "WHERE ep.room_id not in (" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ? " + " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" ") AND ep.user_id = ? AND ep.stream_ordering > ?" ) @@ -175,12 +180,30 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.stream_ordering > ?" + " ORDER BY ep.stream_ordering ASC" + " LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result else None) + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") @@ -201,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) + def _remove_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id, ) + ) + txn.execute( + "DELETE FROM event_push_actions" + " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", + (room_id, user_id, topological_ordering,) + ) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0307b2af3c..4655669ba0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,12 +19,14 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json -from collections import namedtuple +from collections import deque, namedtuple + import logging import math @@ -50,28 +52,169 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +class _EventPeristenceQueue(object): + """Queues up events so that they can be persisted in bulk with only one + concurrent transaction per room. + """ + + _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( + "events_and_contexts", "current_state", "backfilled", "deferred", + )) + + def __init__(self): + self._event_persist_queues = {} + self._currently_persisting_rooms = set() + + def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state): + """Add events to the queue, with the given persist_event options. + """ + queue = self._event_persist_queues.setdefault(room_id, deque()) + if queue: + end_item = queue[-1] + if end_item.current_state or current_state: + # We perist events with current_state set to True one at a time + pass + if end_item.backfilled == backfilled: + end_item.events_and_contexts.extend(events_and_contexts) + return end_item.deferred.observe() + + deferred = ObservableDeferred(defer.Deferred()) + + queue.append(self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + current_state=current_state, + deferred=deferred, + )) + + return deferred.observe() + + def handle_queue(self, room_id, per_item_callback): + """Attempts to handle the queue for a room if not already being handled. + + The given callback will be invoked with for each item in the queue,1 + of type _EventPersistQueueItem. The per_item_callback will continuously + be called with new items, unless the queue becomnes empty. The return + value of the function will be given to the deferreds waiting on the item, + exceptions will be passed to the deferres as well. + + This function should therefore be called whenever anything is added + to the queue. + + If another callback is currently handling the queue then it will not be + invoked. + """ + + if room_id in self._currently_persisting_rooms: + return + + self._currently_persisting_rooms.add(room_id) + + @defer.inlineCallbacks + def handle_queue_loop(): + try: + queue = self._get_drainining_queue(room_id) + for item in queue: + try: + ret = yield per_item_callback(item) + item.deferred.callback(ret) + except Exception as e: + item.deferred.errback(e) + finally: + queue = self._event_persist_queues.pop(room_id, None) + if queue: + self._event_persist_queues[room_id] = queue + self._currently_persisting_rooms.discard(room_id) + + preserve_fn(handle_queue_loop)() + + def _get_drainining_queue(self, room_id): + queue = self._event_persist_queues.setdefault(room_id, deque()) + + try: + while True: + yield queue.popleft() + except IndexError: + # Queue has been drained. + pass + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" def __init__(self, hs): super(EventsStore, self).__init__(hs) + self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) - @defer.inlineCallbacks + self._event_persist_queue = _EventPeristenceQueue() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) backfilled: ? + """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) + + deferreds = [] + for room_id, evs_ctxs in partitioned.items(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + current_state=None, + ) + deferreds.append(d) - Returns: Tuple of stream_orderings where the first is the minimum and - last is the maximum stream ordering assigned to the events when - persisting. + for room_id in partitioned.keys(): + self._maybe_start_persisting(room_id) - """ + return defer.gatherResults(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, current_state=None, backfilled=False): + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, + current_state=current_state, + ) + + self._maybe_start_persisting(event.room_id) + + yield deferred + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(item): + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + + @defer.inlineCallbacks + def _persist_events(self, events_and_contexts, backfilled=False): if not events_and_contexts: return @@ -118,8 +261,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None, backfilled=False): - + def _persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -136,9 +278,6 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((stream_ordering, max_persisted_id)) - @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, @@ -427,6 +566,7 @@ class EventsStore(SQLBaseStore): "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), } for event, _ in events_and_contexts ], diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644 index 0000000000..5dabb607bd --- /dev/null +++ b/synapse/storage/openid.py @@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 57f14fd12b..c8487c8838 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 31 +SCHEMA_VERSION = 32 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 11feb3eb11..9e8e2e2964 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -156,8 +156,7 @@ class PusherStore(SQLBaseStore): profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: def f(txn): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) - return self._simple_upsert_txn( + newly_inserted = self._simple_upsert_txn( txn, "pushers", { @@ -178,11 +177,18 @@ class PusherStore(SQLBaseStore): "id": stream_id, }, ) - defer.returnValue((yield self.runInteraction("add_pusher", f))) + if newly_inserted: + # get_users_with_pushers_in_room only cares if the user has + # at least *one* pusher. + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + + yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + self._simple_delete_one_txn( txn, "pushers", @@ -194,6 +200,7 @@ class PusherStore(SQLBaseStore): {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, {"stream_id": stream_id}, ) + with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( "delete_pusher", delete_pusher_txn, stream_id @@ -233,3 +240,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 935fc503d9..fdcf28f3e1 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000) + @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -244,6 +244,17 @@ class ReceiptsStore(SQLBaseStore): (user_id, room_id, receipt_type) ) + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + topological_ordering = int(res["topological_ordering"]) if res else None + stream_ordering = int(res["stream_ordering"]) if res else None + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -255,16 +266,7 @@ class ReceiptsStore(SQLBaseStore): txn.execute(sql, (room_id, receipt_type, user_id)) results = txn.fetchall() - if results: - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - ) - topological_ordering = int(res["topological_ordering"]) - stream_ordering = int(res["stream_ordering"]) - + if results and topological_ordering: for to, so, _ in results: if int(to) > topological_ordering: return False @@ -294,6 +296,14 @@ class ReceiptsStore(SQLBaseStore): } ) + if receipt_type == "m.read" and topological_ordering: + self._remove_push_actions_before_txn( + txn, + room_id=room_id, + user_id=user_id, + topological_ordering=topological_ordering, + ) + return True @defer.inlineCallbacks @@ -367,7 +377,7 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7af0cae6a5..bda84a744a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 70aa64fb31..26933e593a 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging +import ujson as json logger = logging.getLogger(__name__) @@ -221,3 +222,20 @@ class RoomStore(SQLBaseStore): aliases.extend(e.content['aliases']) defer.returnValue((name, aliases)) + + def add_event_report(self, room_id, event_id, user_id, reason, content, + received_ts): + next_id = self._event_reports_id_gen.get_next() + return self._simple_insert( + table="event_reports", + values={ + "id": next_id, + "received_ts": received_ts, + "room_id": room_id, + "event_id": event_id, + "user_id": user_id, + "reason": reason, + "content": json.dumps(content), + }, + desc="add_event_report" + ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 08a54cbdd1..9d6bfd5245 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -21,7 +21,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import Membership -from synapse.types import UserID +from synapse.types import get_domian_from_id import logging @@ -273,10 +273,7 @@ class RoomMemberStore(SQLBaseStore): room_id, membership=Membership.JOIN ) - joined_domains = set( - UserID.from_string(r["user_id"]).domain - for r in rows - ) + joined_domains = set(get_domian_from_id(r["user_id"]) for r in rows) return joined_domains diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql new file mode 100644 index 0000000000..1dd0f9e170 --- /dev/null +++ b/synapse/storage/schema/delta/32/events.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN received_ts BIGINT; diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644 index 0000000000..36f37b11c8 --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql @@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql new file mode 100644 index 0000000000..d86d30c13c --- /dev/null +++ b/synapse/storage/schema/delta/32/pusher_throttle.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE pusher_throttle( + pusher BIGINT NOT NULL, + room_id TEXT NOT NULL, + last_sent_ts BIGINT, + throttle_ms BIGINT, + PRIMARY KEY (pusher, room_id) +); diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql new file mode 100644 index 0000000000..f859be46a6 --- /dev/null +++ b/synapse/storage/schema/delta/32/remove_indices.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- The following indices are redundant, other indices are equivalent or +-- supersets +DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream +DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT + +DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT + +-- The following indices were unused +DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id; +DROP INDEX IF EXISTS evauth_edges_auth_id; +DROP INDEX IF EXISTS presence_stream_state; diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql new file mode 100644 index 0000000000..d13609776f --- /dev/null +++ b/synapse/storage/schema/delta/32/reports.sql @@ -0,0 +1,25 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE event_reports( + id BIGINT NOT NULL PRIMARY KEY, + received_ts BIGINT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + reason TEXT, + content TEXT +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0a..6c7481a728 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,56 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + self.last_transaction = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - next_id = self._transaction_id_gen.get_next() + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + self.last_transaction[destination] = transaction_id + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") diff --git a/synapse/types.py b/synapse/types.py index 5b166835bd..42fd9c7204 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -21,6 +21,10 @@ from collections import namedtuple Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) +def get_domian_from_id(string): + return string.split(":", 1)[1] + + class DomainSpecificString( namedtuple("DomainSpecificString", ("localpart", "domain")) ): diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py new file mode 100644 index 0000000000..3efa8a8206 --- /dev/null +++ b/synapse/util/presentable_names.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +# intentionally looser than what aliases we allow to be registered since +# other HSes may allow aliases that we would not +ALIAS_RE = re.compile(r"^#.*:.+$") + +ALL_ALONE = "Empty Room" + + +def calculate_room_name(room_state, user_id, fallback_to_members=True): + """ + Works out a user-facing name for the given room as per Matrix + spec recommendations. + Does not yet support internationalisation. + Args: + room_state: Dictionary of the room's state + user_id: The ID of the user to whom the room name is being presented + fallback_to_members: If False, return None instead of generating a name + based on the room's members if the room has no + title or aliases. + + Returns: + (string or None) A human readable name for the room. + """ + # does it have a name? + if ("m.room.name", "") in room_state: + m_room_name = room_state[("m.room.name", "")] + if m_room_name.content and m_room_name.content["name"]: + return m_room_name.content["name"] + + # does it have a canonical alias? + if ("m.room.canonical_alias", "") in room_state: + canon_alias = room_state[("m.room.canonical_alias", "")] + if ( + canon_alias.content and canon_alias.content["alias"] and + _looks_like_an_alias(canon_alias.content["alias"]) + ): + return canon_alias.content["alias"] + + # at this point we're going to need to search the state by all state keys + # for an event type, so rearrange the data structure + room_state_bytype = _state_as_two_level_dict(room_state) + + # right then, any aliases at all? + if "m.room.aliases" in room_state_bytype: + m_room_aliases = room_state_bytype["m.room.aliases"] + if len(m_room_aliases.values()) > 0: + first_alias_event = m_room_aliases.values()[0] + if first_alias_event.content and first_alias_event.content["aliases"]: + the_aliases = first_alias_event.content["aliases"] + if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]): + return the_aliases[0] + + if not fallback_to_members: + return None + + my_member_event = None + if ("m.room.member", user_id) in room_state: + my_member_event = room_state[("m.room.member", user_id)] + + if ( + my_member_event is not None and + my_member_event.content['membership'] == "invite" + ): + if ("m.room.member", my_member_event.sender) in room_state: + inviter_member_event = room_state[("m.room.member", my_member_event.sender)] + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return "Room Invite" + + # we're going to have to generate a name based on who's in the room, + # so find out who is in the room that isn't the user. + if "m.room.member" in room_state_bytype: + all_members = [ + ev for ev in room_state_bytype["m.room.member"].values() + if ev.content['membership'] == "join" or ev.content['membership'] == "invite" + ] + # Sort the member events oldest-first so the we name people in the + # order the joined (it should at least be deterministic rather than + # dictionary iteration order) + all_members.sort(key=lambda e: e.origin_server_ts) + other_members = [m for m in all_members if m.state_key != user_id] + else: + other_members = [] + all_members = [] + + if len(other_members) == 0: + if len(all_members) == 1: + # self-chat, peeked room with 1 participant, + # or inbound invite, or outbound 3PID invite. + if all_members[0].sender == user_id: + if "m.room.third_party_invite" in room_state_bytype: + third_party_invites = room_state_bytype["m.room.third_party_invite"] + if len(third_party_invites) > 0: + # technically third party invite events are not member + # events, but they are close enough + return "Inviting %s" ( + descriptor_from_member_events(third_party_invites) + ) + else: + return ALL_ALONE + else: + return name_from_member_event(all_members[0]) + else: + return ALL_ALONE + else: + return descriptor_from_member_events(other_members) + + +def descriptor_from_member_events(member_events): + if len(member_events) == 0: + return "nobody" + elif len(member_events) == 1: + return name_from_member_event(member_events[0]) + elif len(member_events) == 2: + return "%s and %s" % ( + name_from_member_event(member_events[0]), + name_from_member_event(member_events[1]), + ) + else: + return "%s and %d others" % ( + name_from_member_event(member_events[0]), + len(member_events) - 1, + ) + + +def name_from_member_event(member_event): + if ( + member_event.content and "displayname" in member_event.content and + member_event.content["displayname"] + ): + return member_event.content["displayname"] + return member_event.state_key + + +def _state_as_two_level_dict(state): + ret = {} + for k, v in state.items(): + ret.setdefault(k[0], {})[k[1]] = v + return ret + + +def _looks_like_an_alias(string): + return ALIAS_RE.match(string) is not None diff --git a/synapse/visibility.py b/synapse/visibility.py new file mode 100644 index 0000000000..948ad51772 --- /dev/null +++ b/synapse/visibility.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import Membership, EventTypes + +from synapse.util.logcontext import preserve_fn + +import logging + + +logger = logging.getLogger(__name__) + + +VISIBILITY_PRIORITY = ( + "world_readable", + "shared", + "invited", + "joined", +) + + +MEMBERSHIP_PRIORITY = ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, +) + + +@defer.inlineCallbacks +def filter_events_for_clients(store, user_tuples, events, event_id_to_state): + """ Returns dict of user_id -> list of events that user is allowed to + see. + + Args: + user_tuples (str, bool): (user id, is_peeking) for each user to be + checked. is_peeking should be true if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the + given events + events ([synapse.events.EventBase]): list of events to filter + """ + forgotten = yield defer.gatherResults([ + preserve_fn(store.who_forgot_in_room)( + room_id, + ) + for room_id in frozenset(e.room_id for e in events) + ], consumeErrors=True) + + # Set of membership event_ids that have been forgotten + event_id_forgotten = frozenset( + row["event_id"] for rows in forgotten for row in rows + ) + + ignore_dict_content = yield store.get_global_account_data_by_type_for_users( + "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples] + ) + + # FIXME: This will explode if people upload something incorrect. + ignore_dict = { + user_id: frozenset( + content.get("ignored_users", {}).keys() if content else [] + ) + for user_id, content in ignore_dict_content.items() + } + + def allowed(event, user_id, is_peeking, ignore_list): + """ + Args: + event (synapse.events.EventBase): event to check + user_id (str) + is_peeking (bool) + ignore_list (list): list of users to ignore + """ + if not event.is_state() and event.sender in ignore_list: + return False + + state = event_id_to_state[event.event_id] + + # get the room_visibility at the time of the event. + visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) + if visibility_event: + visibility = visibility_event.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility not in VISIBILITY_PRIORITY: + visibility = "shared" + + # if it was world_readable, it's easy: everyone can read it + if visibility == "world_readable": + return True + + # Always allow history visibility events on boundaries. This is done + # by setting the effective visibility to the least restrictive + # of the old vs new. + if event.type == EventTypes.RoomHistoryVisibility: + prev_content = event.unsigned.get("prev_content", {}) + prev_visibility = prev_content.get("history_visibility", None) + + if prev_visibility not in VISIBILITY_PRIORITY: + prev_visibility = "shared" + + new_priority = VISIBILITY_PRIORITY.index(visibility) + old_priority = VISIBILITY_PRIORITY.index(prev_visibility) + if old_priority < new_priority: + visibility = prev_visibility + + # likewise, if the event is the user's own membership event, use + # the 'most joined' membership + membership = None + if event.type == EventTypes.Member and event.state_key == user_id: + membership = event.content.get("membership", None) + if membership not in MEMBERSHIP_PRIORITY: + membership = "leave" + + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership not in MEMBERSHIP_PRIORITY: + prev_membership = "leave" + + new_priority = MEMBERSHIP_PRIORITY.index(membership) + old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) + if old_priority < new_priority: + membership = prev_membership + + # otherwise, get the user's membership at the time of the event. + if membership is None: + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + if membership_event.event_id not in event_id_forgotten: + membership = membership_event.membership + + # if the user was a member of the room at the time of the event, + # they can see it. + if membership == Membership.JOIN: + return True + + if visibility == "joined": + # we weren't a member at the time of the event, so we can't + # see this event. + return False + + elif visibility == "invited": + # user can also see the event if they were *invited* at the time + # of the event. + return membership == Membership.INVITE + + else: + # visibility is shared: user can also see the event if they have + # become a member since the event + # + # XXX: if the user has subsequently joined and then left again, + # ideally we would share history up to the point they left. But + # we don't know when they left. + return not is_peeking + + defer.returnValue({ + user_id: [ + event + for event in events + if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, [])) + ] + for user_id, is_peeking in user_tuples + }) + + +@defer.inlineCallbacks +def filter_events_for_client(store, user_id, events, is_peeking=False): + """ + Check which events a user is allowed to see + + Args: + user_id(str): user id to be checked + events([synapse.events.EventBase]): list of events to be checked + is_peeking(bool): should be True if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the given + events + + Returns: + [synapse.events.EventBase] + """ + types = ( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + event_id_to_state = yield store.get_state_for_events( + frozenset(e.event_id for e in events), + types=types + ) + res = yield filter_events_for_clients( + store, [(user_id, is_peeking)], events, event_id_to_state + ) + defer.returnValue(res.get(user_id, [])) |