diff options
Diffstat (limited to 'synapse')
34 files changed, 1175 insertions, 280 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3b2a2ab77a..022211e34e 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -25,7 +25,7 @@ from twisted.internet import defer import synapse.types from synapse import event_auth from synapse.api.constants import EventTypes, JoinRules, Membership -from synapse.api.errors import AuthError, Codes +from synapse.api.errors import AuthError, Codes, ResourceLimitError from synapse.types import UserID from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache from synapse.util.caches.lrucache import LruCache @@ -784,10 +784,11 @@ class Auth(object): MAU cohort """ if self.hs.config.hs_disabled: - raise AuthError( + raise ResourceLimitError( 403, self.hs.config.hs_disabled_message, errcode=Codes.RESOURCE_LIMIT_EXCEED, admin_uri=self.hs.config.admin_uri, + limit_type=self.hs.config.hs_disabled_limit_type ) if self.hs.config.limit_usage_by_mau is True: # If the user is already part of the MAU cohort @@ -798,8 +799,10 @@ class Auth(object): # Else if there is no room in the MAU bucket, bail current_mau = yield self.store.get_monthly_active_count() if current_mau >= self.hs.config.max_mau_value: - raise AuthError( - 403, "Monthly Active User Limits AU Limit Exceeded", + raise ResourceLimitError( + 403, "Monthly Active User Limit Exceeded", + admin_uri=self.hs.config.admin_uri, - errcode=Codes.RESOURCE_LIMIT_EXCEED + errcode=Codes.RESOURCE_LIMIT_EXCEED, + limit_type="monthly_active_user" ) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 08f0cb5554..e26001ab12 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -224,15 +224,34 @@ class NotFoundError(SynapseError): class AuthError(SynapseError): """An error raised when there was a problem authorising an event.""" - def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None): + + def __init__(self, *args, **kwargs): + if "errcode" not in kwargs: + kwargs["errcode"] = Codes.FORBIDDEN + super(AuthError, self).__init__(*args, **kwargs) + + +class ResourceLimitError(SynapseError): + """ + Any error raised when there is a problem with resource usage. + For instance, the monthly active user limit for the server has been exceeded + """ + def __init__( + self, code, msg, + errcode=Codes.RESOURCE_LIMIT_EXCEED, + admin_uri=None, + limit_type=None, + ): self.admin_uri = admin_uri - super(AuthError, self).__init__(code, msg, errcode=errcode) + self.limit_type = limit_type + super(ResourceLimitError, self).__init__(code, msg, errcode=errcode) def error_dict(self): return cs_error( self.msg, self.errcode, admin_uri=self.admin_uri, + limit_type=self.limit_type ) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 6b77aec832..ab79a45646 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import ( JoinedRoomMemberListRestServlet, @@ -66,7 +66,7 @@ class ClientReaderSlavedStore( DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, - TransactionStore, + SlavedTransactionStore, SlavedClientIpStore, BaseSlavedStore, ): diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index a385793dd4..03d39968a8 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import ( JoinRoomAliasServlet, @@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( DirectoryStore, - TransactionStore, + SlavedTransactionStore, SlavedProfileStore, SlavedAccountDataStore, SlavedPusherStore, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 57d96c13a2..7d8105778d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -32,11 +32,17 @@ from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,11 +55,17 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( + SlavedAccountDataStore, + SlavedProfileStore, + SlavedApplicationServiceStore, + SlavedPusherStore, + SlavedPushRuleStore, + SlavedReceiptsStore, SlavedEventStore, SlavedKeyStore, RoomStore, DirectoryStore, - TransactionStore, + SlavedTransactionStore, BaseSlavedStore, ): pass diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 7bbf0ad082..7a4310ca18 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -36,7 +36,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( - SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, + SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a98bb506e5..005921dcf7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -525,6 +525,7 @@ def run(hs): clock.looping_call( hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60 ) + hs.get_datastore().reap_monthly_active_users() @defer.inlineCallbacks def generate_monthly_active_users(): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 1423056732..fd1f6cbf7e 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer @@ -52,7 +52,7 @@ class MediaRepositorySlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedClientIpStore, - TransactionStore, + SlavedTransactionStore, BaseSlavedStore, MediaRepositoryStore, ): diff --git a/synapse/config/server.py b/synapse/config/server.py index 2190f3210a..a41c48e69c 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -81,6 +81,7 @@ class ServerConfig(Config): # Options to disable HS self.hs_disabled = config.get("hs_disabled", False) self.hs_disabled_message = config.get("hs_disabled_message", "") + self.hs_disabled_limit_type = config.get("hs_disabled_limit_type", "") # Admin uri to direct users at should their instance become blocked # due to resource constraints @@ -340,6 +341,32 @@ class ServerConfig(Config): # - port: 9000 # bind_addresses: ['::1', '127.0.0.1'] # type: manhole + + + # Homeserver blocking + # + # How to reach the server admin, used in ResourceLimitError + # admin_uri: 'mailto:admin@server.com' + # + # Global block config + # + # hs_disabled: False + # hs_disabled_message: 'Human readable reason for why the HS is blocked' + # hs_disabled_limit_type: 'error code(str), to help clients decode reason' + # + # Monthly Active User Blocking + # + # Enables monthly active user checking + # limit_usage_by_mau: False + # max_mau_value: 50 + # + # Sometimes the server admin will want to ensure certain accounts are + # never blocked by mau checking. These accounts are specified here. + # + # mau_limit_reserved_threepids: + # - medium: 'email' + # address: 'reserved_user@example.com' + """ % locals() def read_arguments(self, args): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a23136784a..3e0cd294a1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.replication.http.federation import ( + ReplicationFederationSendEduRestServlet, + ReplicationGetQueryRestServlet, +) from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -760,6 +764,8 @@ class FederationHandlerRegistry(object): if edu_type in self.edu_handlers: raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + logger.info("Registering federation EDU handler for %r", edu_type) + self.edu_handlers[edu_type] = handler def register_query_handler(self, query_type, handler): @@ -778,6 +784,8 @@ class FederationHandlerRegistry(object): "Already have a Query handler for %s" % (query_type,) ) + logger.info("Registering federation query handler for %r", query_type) + self.query_handlers[query_type] = handler @defer.inlineCallbacks @@ -800,3 +808,49 @@ class FederationHandlerRegistry(object): raise NotFoundError("No handler for Query type '%s'" % (query_type,)) return handler(args) + + +class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + """A FederationHandlerRegistry for worker processes. + + When receiving EDU or queries it will check if an appropriate handler has + been registered on the worker, if there isn't one then it calls off to the + master process. + """ + + def __init__(self, hs): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + super(ReplicationFederationHandlerRegistry, self).__init__() + + def on_edu(self, edu_type, origin, content): + """Overrides FederationHandlerRegistry + """ + handler = self.edu_handlers.get(edu_type) + if handler: + return super(ReplicationFederationHandlerRegistry, self).on_edu( + edu_type, origin, content, + ) + + return self._send_edu( + edu_type=edu_type, + origin=origin, + content=content, + ) + + def on_query(self, query_type, args): + """Overrides FederationHandlerRegistry + """ + handler = self.query_handlers.get(query_type) + if handler: + return handler(args) + + return self._get_query_client( + query_type=query_type, + args=args, + ) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 6059c3d3a8..4a81bd2ba9 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -520,7 +520,7 @@ class AuthHandler(BaseHandler): """ logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) - yield self.auth.check_auth_blocking() + yield self.auth.check_auth_blocking(user_id) # the device *should* have been registered before we got here; however, # it's possible we raced against a DELETE operation. The thing we @@ -734,7 +734,6 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def validate_short_term_login_token_and_get_user_id(self, login_token): - yield self.auth.check_auth_blocking() auth_api = self.hs.get_auth() user_id = None try: @@ -743,6 +742,7 @@ class AuthHandler(BaseHandler): auth_api.validate_macaroon(macaroon, "login", True, user_id) except Exception: raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) + yield self.auth.check_auth_blocking(user_id) defer.returnValue(user_id) @defer.inlineCallbacks diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2380d17f4e..f38b393e4a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -49,6 +49,11 @@ from synapse.crypto.event_signing import ( compute_event_signature, ) from synapse.events.validator import EventValidator +from synapse.replication.http.federation import ( + ReplicationCleanRoomRestServlet, + ReplicationFederationSendEventsRestServlet, +) +from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -91,6 +96,18 @@ class FederationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._server_notices_mxid = hs.config.server_notices_mxid + self.config = hs.config + self.http_client = hs.get_simple_http_client() + + self._send_events_to_master = ( + ReplicationFederationSendEventsRestServlet.make_client(hs) + ) + self._notify_user_membership_change = ( + ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + ) + self._clean_room_for_join_client = ( + ReplicationCleanRoomRestServlet.make_client(hs) + ) # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -1158,7 +1175,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - yield self._persist_events([(event, context)]) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1189,7 +1206,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - yield self._persist_events([(event, context)]) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1432,7 +1449,7 @@ class FederationHandler(BaseHandler): event, context ) - yield self._persist_events( + yield self.persist_events_and_notify( [(event, context)], backfilled=backfilled, ) @@ -1470,7 +1487,7 @@ class FederationHandler(BaseHandler): ], consumeErrors=True, )) - yield self._persist_events( + yield self.persist_events_and_notify( [ (ev_info["event"], context) for ev_info, context in zip(event_infos, contexts) @@ -1558,7 +1575,7 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self._persist_events( + yield self.persist_events_and_notify( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) @@ -1569,7 +1586,7 @@ class FederationHandler(BaseHandler): event, old_state=state ) - yield self._persist_events( + yield self.persist_events_and_notify( [(event, new_event_context)], ) @@ -2297,7 +2314,7 @@ class FederationHandler(BaseHandler): for revocation. """ try: - response = yield self.hs.get_simple_http_client().get_json( + response = yield self.http_client.get_json( url, {"public_key": public_key} ) @@ -2310,7 +2327,7 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Third party certificate was invalid") @defer.inlineCallbacks - def _persist_events(self, event_and_contexts, backfilled=False): + def persist_events_and_notify(self, event_and_contexts, backfilled=False): """Persists events and tells the notifier/pushers about them, if necessary. @@ -2322,14 +2339,21 @@ class FederationHandler(BaseHandler): Returns: Deferred """ - max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled, - ) + if self.config.worker_app: + yield self._send_events_to_master( + store=self.store, + event_and_contexts=event_and_contexts, + backfilled=backfilled + ) + else: + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) - if not backfilled: # Never notify for backfilled events - for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2368,9 +2392,25 @@ class FederationHandler(BaseHandler): ) def _clean_room_for_join(self, room_id): - return self.store.clean_room_for_join(room_id) + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Args: + room_id (str) + """ + if self.config.worker_app: + return self._clean_room_for_join_client(room_id) + else: + return self.store.clean_room_for_join(room_id) def user_joined_room(self, user, room_id): """Called when a new user has joined the room """ - return user_joined_room(self.distributor, user, room_id) + if self.config.worker_app: + return self._notify_user_membership_change( + room_id=room_id, + user_id=user.to_string(), + change="joined", + ) + else: + return user_joined_room(self.distributor, user, room_id) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 01a362360e..4d006df63c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -25,7 +25,13 @@ from twisted.internet import defer from twisted.internet.defer import succeed from synapse.api.constants import MAX_DEPTH, EventTypes, Membership -from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError +from synapse.api.errors import ( + AuthError, + Codes, + ConsentNotGivenError, + NotFoundError, + SynapseError, +) from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event @@ -36,6 +42,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -82,28 +89,85 @@ class MessageHandler(object): defer.returnValue(data) @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): + def get_state_events( + self, user_id, room_id, types=None, filtered_types=None, + at_token=None, is_guest=False, + ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has - left the room return the state events from when they left. + left the room return the state events from when they left. If an explicit + 'at' parameter is passed, return the state events as of that event, if + visible. Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + at_token(StreamToken|None): the stream token of the at which we are requesting + the stats. If the user is not allowed to view the state as of that + stream token, we raise a 403 SynapseError. If None, returns the current + state based on the current_state_events table. + is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] + Raises: + NotFoundError (404) if the at token does not yield an event + + AuthError (403) if the user doesn't have permission to view + members of this room. """ - membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) + if at_token: + # FIXME this claims to get the state at a stream position, but + # get_recent_events_for_room operates by topo ordering. This therefore + # does not reliably give you the state at the given stream position. + # (https://github.com/matrix-org/synapse/issues/3305) + last_events, _ = yield self.store.get_recent_events_for_room( + room_id, end_token=at_token.room_key, limit=1, + ) - if membership == Membership.JOIN: - room_state = yield self.state.get_current_state(room_id) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], None + if not last_events: + raise NotFoundError("Can't find event for token %s" % (at_token, )) + + visible_events = yield filter_events_for_client( + self.store, user_id, last_events, ) - room_state = room_state[membership_event_id] + + event = last_events[0] + if visible_events: + room_state = yield self.store.get_state_for_events( + [event.event_id], types, filtered_types=filtered_types, + ) + room_state = room_state[event.event_id] + else: + raise AuthError( + 403, + "User %s not allowed to view events in room %s at token %s" % ( + user_id, room_id, at_token, + ) + ) + else: + membership, membership_event_id = ( + yield self.auth.check_in_room_or_world_readable( + room_id, user_id, + ) + ) + + if membership == Membership.JOIN: + state_ids = yield self.store.get_filtered_current_state_ids( + room_id, types, filtered_types=filtered_types, + ) + room_state = yield self.store.get_events(state_ids.values()) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], types, filtered_types=filtered_types, + ) + room_state = room_state[membership_event_id] now = self.clock.time_msec() defer.returnValue( @@ -212,10 +276,14 @@ class EventCreationHandler(object): where *hashes* is a map from algorithm to hash. If None, they will be requested from the database. - + Raises: + ResourceLimitError if server is blocked to some resource being + exceeded Returns: Tuple of created event (FrozenEvent), Context """ + yield self.auth.check_auth_blocking(requester.user.to_string()) + builder = self.event_builder_factory.new(event_dict) self.validator.validate_new(builder) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a97d43550f..5170d093e3 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -18,7 +18,7 @@ import logging from twisted.internet import defer from twisted.python.failure import Failure -from synapse.api.constants import Membership +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event from synapse.types import RoomStreamToken @@ -251,6 +251,33 @@ class PaginationHandler(object): is_peeking=(member_event_id is None), ) + state = None + if event_filter and event_filter.lazy_load_members(): + # TODO: remove redundant members + + types = [ + (EventTypes.Member, state_key) + for state_key in set( + event.sender # FIXME: we also care about invite targets etc. + for event in events + ) + ] + + state_ids = yield self.store.get_state_ids_for_event( + events[0].event_id, types=types, + ) + + if state_ids: + state = yield self.store.get_events(list(state_ids.values())) + + if state: + state = yield filter_events_for_client( + self.store, + user_id, + state.values(), + is_peeking=(member_event_id is None), + ) + time_now = self.clock.time_msec() chunk = { @@ -262,4 +289,10 @@ class PaginationHandler(object): "end": next_token.to_string(), } + if state: + chunk["state"] = [ + serialize_event(e, time_now, as_client_event) + for e in state + ] + defer.returnValue(chunk) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6a17c42238..c3f820b975 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -98,9 +98,13 @@ class RoomCreationHandler(BaseHandler): Raises: SynapseError if the room ID couldn't be stored, or something went horribly wrong. + ResourceLimitError if server is blocked to some resource being + exceeded """ user_id = requester.user.to_string() + self.auth.check_auth_blocking(user_id) + if not self.spam_checker.user_may_create_room(user_id): raise SynapseError(403, "You are not permitted to create rooms") diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3b21a04a5d..ac3edf0cc9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -75,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "ephemeral", "account_data", "unread_notifications", + "summary", ])): __slots__ = [] @@ -504,9 +505,141 @@ class SyncHandler(object): defer.returnValue(state) @defer.inlineCallbacks + def compute_summary(self, room_id, sync_config, batch, state, now_token): + """ Works out a room summary block for this room, summarising the number + of joined members in the room, and providing the 'hero' members if the + room has no name so clients can consistently name rooms. Also adds + state events to 'state' if needed to describe the heroes. + + Args: + room_id(str): + sync_config(synapse.handlers.sync.SyncConfig): + batch(synapse.handlers.sync.TimelineBatch): The timeline batch for + the room that will be sent to the user. + state(dict): dict of (type, state_key) -> Event as returned by + compute_state_delta + now_token(str): Token of the end of the current batch. + + Returns: + A deferred dict describing the room summary + """ + + # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 + last_events, _ = yield self.store.get_recent_event_ids_for_room( + room_id, end_token=now_token.room_key, limit=1, + ) + + if not last_events: + defer.returnValue(None) + return + + last_event = last_events[-1] + state_ids = yield self.store.get_state_ids_for_event( + last_event.event_id, [ + (EventTypes.Member, None), + (EventTypes.Name, ''), + (EventTypes.CanonicalAlias, ''), + ] + ) + + member_ids = { + state_key: event_id + for (t, state_key), event_id in state_ids.iteritems() + if t == EventTypes.Member + } + name_id = state_ids.get((EventTypes.Name, '')) + canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, '')) + + summary = {} + + # FIXME: it feels very heavy to load up every single membership event + # just to calculate the counts. + member_events = yield self.store.get_events(member_ids.values()) + + joined_user_ids = [] + invited_user_ids = [] + + for ev in member_events.values(): + if ev.content.get("membership") == Membership.JOIN: + joined_user_ids.append(ev.state_key) + elif ev.content.get("membership") == Membership.INVITE: + invited_user_ids.append(ev.state_key) + + # TODO: only send these when they change. + summary["m.joined_member_count"] = len(joined_user_ids) + summary["m.invited_member_count"] = len(invited_user_ids) + + if name_id or canonical_alias_id: + defer.returnValue(summary) + + # FIXME: order by stream ordering, not alphabetic + + me = sync_config.user.to_string() + if (joined_user_ids or invited_user_ids): + summary['m.heroes'] = sorted( + [ + user_id + for user_id in (joined_user_ids + invited_user_ids) + if user_id != me + ] + )[0:5] + else: + summary['m.heroes'] = sorted( + [user_id for user_id in member_ids.keys() if user_id != me] + )[0:5] + + if not sync_config.filter_collection.lazy_load_members(): + defer.returnValue(summary) + + # ensure we send membership events for heroes if needed + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.get_lazy_loaded_members_cache(cache_key) + + # track which members the client should already know about via LL: + # Ones which are already in state... + existing_members = set( + user_id for (typ, user_id) in state.keys() + if typ == EventTypes.Member + ) + + # ...or ones which are in the timeline... + for ev in batch.events: + if ev.type == EventTypes.Member: + existing_members.add(ev.state_key) + + # ...and then ensure any missing ones get included in state. + missing_hero_event_ids = [ + member_ids[hero_id] + for hero_id in summary['m.heroes'] + if ( + cache.get(hero_id) != member_ids[hero_id] and + hero_id not in existing_members + ) + ] + + missing_hero_state = yield self.store.get_events(missing_hero_event_ids) + missing_hero_state = missing_hero_state.values() + + for s in missing_hero_state: + cache.set(s.state_key, s.event_id) + state[(EventTypes.Member, s.state_key)] = s + + defer.returnValue(summary) + + def get_lazy_loaded_members_cache(self, cache_key): + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + return cache + + @defer.inlineCallbacks def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, full_state): - """ Works out the differnce in state between the start of the timeline + """ Works out the difference in state between the start of the timeline and the previous sync. Args: @@ -520,7 +653,7 @@ class SyncHandler(object): full_state(bool): Whether to force returning the full state. Returns: - A deferred new event dictionary + A deferred dict of (type, state_key) -> Event """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -618,13 +751,7 @@ class SyncHandler(object): if lazy_load_members and not include_redundant_members: cache_key = (sync_config.user.to_string(), sync_config.device_id) - cache = self.lazy_loaded_members_cache.get(cache_key) - if cache is None: - logger.debug("creating LruCache for %r", cache_key) - cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) - self.lazy_loaded_members_cache[cache_key] = cache - else: - logger.debug("found LruCache for %r", cache_key) + cache = self.get_lazy_loaded_members_cache(cache_key) # if it's a new sync sequence, then assume the client has had # amnesia and doesn't want any recent lazy-loaded members @@ -1425,7 +1552,6 @@ class SyncHandler(object): if events == [] and tags is None: return - since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1468,6 +1594,18 @@ class SyncHandler(object): full_state=full_state ) + summary = {} + if ( + sync_config.filter_collection.lazy_load_members() and + ( + any(ev.type == EventTypes.Member for ev in batch.events) or + since_token is None + ) + ): + summary = yield self.compute_summary( + room_id, sync_config, batch, state, now_token + ) + if room_builder.rtype == "joined": unread_notifications = {} room_sync = JoinedSyncResult( @@ -1477,6 +1615,7 @@ class SyncHandler(object): ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + summary=summary, ) if room_sync or always_include: diff --git a/synapse/http/server.py b/synapse/http/server.py index 5733abb5dc..2d5c23e673 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -25,7 +25,7 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso from twisted.internet import defer from twisted.python import failure -from twisted.web import resource, server +from twisted.web import resource from twisted.web.server import NOT_DONE_YET from twisted.web.static import NoRangeStaticProducer from twisted.web.util import redirectTo @@ -38,10 +38,8 @@ from synapse.api.errors import ( SynapseError, UnrecognizedRequestError, ) -from synapse.http.request_metrics import requests_counter from synapse.util.caches import intern_dict -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn if PY3: from io import BytesIO @@ -66,11 +64,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html> def wrap_json_request_handler(h): """Wraps a request handler method with exception handling. - Also adds logging as per wrap_request_handler_with_logging. + Also does the wrapping with request.processing as per wrap_async_request_handler. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. The handler must return a deferred. If the deferred succeeds we assume that a response has been sent. If the deferred fails with a SynapseError we use @@ -114,24 +111,23 @@ def wrap_json_request_handler(h): pretty_print=_request_user_agent_is_curl(request), ) - return wrap_request_handler_with_logging(wrapped_request_handler) + return wrap_async_request_handler(wrapped_request_handler) def wrap_html_request_handler(h): """Wraps a request handler method with exception handling. - Also adds logging as per wrap_request_handler_with_logging. + Also does the wrapping with request.processing as per wrap_async_request_handler. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. """ def wrapped_request_handler(self, request): d = defer.maybeDeferred(h, self, request) d.addErrback(_return_html_error, request) return d - return wrap_request_handler_with_logging(wrapped_request_handler) + return wrap_async_request_handler(wrapped_request_handler) def _return_html_error(f, request): @@ -176,46 +172,26 @@ def _return_html_error(f, request): finish_request(request) -def wrap_request_handler_with_logging(h): - """Wraps a request handler to provide logging and metrics +def wrap_async_request_handler(h): + """Wraps an async request handler so that it calls request.processing. + + This helps ensure that work done by the request handler after the request is completed + is correctly recorded against the request metrics/logs. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. - As well as calling `request.processing` (which will log the response and - duration for this request), the wrapped request handler will insert the - request id into the logging context. + The handler may return a deferred, in which case the completion of the request isn't + logged until the deferred completes. """ @defer.inlineCallbacks - def wrapped_request_handler(self, request): - """ - Args: - self: - request (synapse.http.site.SynapseRequest): - """ + def wrapped_async_request_handler(self, request): + with request.processing(): + yield h(self, request) - request_id = request.get_request_id() - with LoggingContext(request_id) as request_context: - request_context.request = request_id - with Measure(self.clock, "wrapped_request_handler"): - # we start the request metrics timer here with an initial stab - # at the servlet name. For most requests that name will be - # JsonResource (or a subclass), and JsonResource._async_render - # will update it once it picks a servlet. - servlet_name = self.__class__.__name__ - with request.processing(servlet_name): - with PreserveLoggingContext(request_context): - d = defer.maybeDeferred(h, self, request) - - # record the arrival of the request *after* - # dispatching to the handler, so that the handler - # can update the servlet name in the request - # metrics - requests_counter.labels(request.method, - request.request_metrics.name).inc() - yield d - return wrapped_request_handler + # we need to preserve_fn here, because the synchronous render method won't yield for + # us (obviously) + return preserve_fn(wrapped_async_request_handler) class HttpServer(object): @@ -278,7 +254,7 @@ class JsonResource(HttpServer, resource.Resource): """ This gets called by twisted every time someone sends us a request. """ self._async_render(request) - return server.NOT_DONE_YET + return NOT_DONE_YET @wrap_json_request_handler @defer.inlineCallbacks diff --git a/synapse/http/site.py b/synapse/http/site.py index 5fd30a4c2c..f5a8f78406 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import contextlib import logging import time @@ -19,8 +18,8 @@ import time from twisted.web.server import Request, Site from synapse.http import redact_uri -from synapse.http.request_metrics import RequestMetrics -from synapse.util.logcontext import ContextResourceUsage, LoggingContext +from synapse.http.request_metrics import RequestMetrics, requests_counter +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext logger = logging.getLogger(__name__) @@ -34,25 +33,43 @@ class SynapseRequest(Request): It extends twisted's twisted.web.server.Request, and adds: * Unique request ID + * A log context associated with the request * Redaction of access_token query-params in __repr__ * Logging at start and end * Metrics to record CPU, wallclock and DB time by endpoint. - It provides a method `processing` which should be called by the Resource - which is handling the request, and returns a context manager. + It also provides a method `processing`, which returns a context manager. If this + method is called, the request won't be logged until the context manager is closed; + this is useful for asynchronous request handlers which may go on processing the + request even after the client has disconnected. + Attributes: + logcontext(LoggingContext) : the log context for this request """ def __init__(self, site, channel, *args, **kw): Request.__init__(self, channel, *args, **kw) self.site = site - self._channel = channel + self._channel = channel # this is used by the tests self.authenticated_entity = None self.start_time = 0 + # we can't yet create the logcontext, as we don't know the method. + self.logcontext = None + global _next_request_seq self.request_seq = _next_request_seq _next_request_seq += 1 + # whether an asynchronous request handler has called processing() + self._is_processing = False + + # the time when the asynchronous request handler completed its processing + self._processing_finished_time = None + + # what time we finished sending the response to the client (or the connection + # dropped) + self.finish_time = None + def __repr__(self): # We overwrite this so that we don't log ``access_token`` return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( @@ -74,11 +91,116 @@ class SynapseRequest(Request): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] def render(self, resrc): + # this is called once a Resource has been found to serve the request; in our + # case the Resource in question will normally be a JsonResource. + + # create a LogContext for this request + request_id = self.get_request_id() + logcontext = self.logcontext = LoggingContext(request_id) + logcontext.request = request_id + # override the Server header which is set by twisted self.setHeader("Server", self.site.server_version_string) - return Request.render(self, resrc) + + with PreserveLoggingContext(self.logcontext): + # we start the request metrics timer here with an initial stab + # at the servlet name. For most requests that name will be + # JsonResource (or a subclass), and JsonResource._async_render + # will update it once it picks a servlet. + servlet_name = resrc.__class__.__name__ + self._started_processing(servlet_name) + + Request.render(self, resrc) + + # record the arrival of the request *after* + # dispatching to the handler, so that the handler + # can update the servlet name in the request + # metrics + requests_counter.labels(self.method, + self.request_metrics.name).inc() + + @contextlib.contextmanager + def processing(self): + """Record the fact that we are processing this request. + + Returns a context manager; the correct way to use this is: + + @defer.inlineCallbacks + def handle_request(request): + with request.processing("FooServlet"): + yield really_handle_the_request() + + Once the context manager is closed, the completion of the request will be logged, + and the various metrics will be updated. + """ + if self._is_processing: + raise RuntimeError("Request is already processing") + self._is_processing = True + + try: + yield + except Exception: + # this should already have been caught, and sent back to the client as a 500. + logger.exception("Asynchronous messge handler raised an uncaught exception") + finally: + # the request handler has finished its work and either sent the whole response + # back, or handed over responsibility to a Producer. + + self._processing_finished_time = time.time() + self._is_processing = False + + # if we've already sent the response, log it now; otherwise, we wait for the + # response to be sent. + if self.finish_time is not None: + self._finished_processing() + + def finish(self): + """Called when all response data has been written to this Request. + + Overrides twisted.web.server.Request.finish to record the finish time and do + logging. + """ + self.finish_time = time.time() + Request.finish(self) + if not self._is_processing: + with PreserveLoggingContext(self.logcontext): + self._finished_processing() + + def connectionLost(self, reason): + """Called when the client connection is closed before the response is written. + + Overrides twisted.web.server.Request.connectionLost to record the finish time and + do logging. + """ + self.finish_time = time.time() + Request.connectionLost(self, reason) + + # we only get here if the connection to the client drops before we send + # the response. + # + # It's useful to log it here so that we can get an idea of when + # the client disconnects. + with PreserveLoggingContext(self.logcontext): + logger.warn( + "Error processing request: %s %s", reason.type, reason.value, + ) + + if not self._is_processing: + self._finished_processing() def _started_processing(self, servlet_name): + """Record the fact that we are processing this request. + + This will log the request's arrival. Once the request completes, + be sure to call finished_processing. + + Args: + servlet_name (str): the name of the servlet which will be + processing this request. This is used in the metrics. + + It is possible to update this afterwards by updating + self.request_metrics.name. + """ self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( @@ -94,13 +216,21 @@ class SynapseRequest(Request): ) def _finished_processing(self): - try: - context = LoggingContext.current_context() - usage = context.get_resource_usage() - except Exception: - usage = ContextResourceUsage() + """Log the completion of this request and update the metrics + """ + + usage = self.logcontext.get_resource_usage() + + if self._processing_finished_time is None: + # we completed the request without anything calling processing() + self._processing_finished_time = time.time() - end_time = time.time() + # the time between receiving the request and the request handler finishing + processing_time = self._processing_finished_time - self.start_time + + # the time between the request handler finishing and the response being sent + # to the client (nb may be negative) + response_send_time = self.finish_time - self._processing_finished_time # need to decode as it could be raw utf-8 bytes # from a IDN servname in an auth header @@ -116,22 +246,31 @@ class SynapseRequest(Request): user_agent = self.get_user_agent() if user_agent is not None: user_agent = user_agent.decode("utf-8", "replace") + else: + user_agent = "-" + + code = str(self.code) + if not self.finished: + # we didn't send the full response before we gave up (presumably because + # the connection dropped) + code += "!" self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" + " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", self.getClientIP(), self.site.site_tag, authenticated_entity, - end_time - self.start_time, + processing_time, + response_send_time, usage.ru_utime, usage.ru_stime, usage.db_sched_duration_sec, usage.db_txn_duration_sec, int(usage.db_txn_count), self.sentLength, - self.code, + code, self.method, self.get_redacted_uri(), self.clientproto, @@ -140,38 +279,10 @@ class SynapseRequest(Request): ) try: - self.request_metrics.stop(end_time, self) + self.request_metrics.stop(self.finish_time, self) except Exception as e: logger.warn("Failed to stop metrics: %r", e) - @contextlib.contextmanager - def processing(self, servlet_name): - """Record the fact that we are processing this request. - - Returns a context manager; the correct way to use this is: - - @defer.inlineCallbacks - def handle_request(request): - with request.processing("FooServlet"): - yield really_handle_the_request() - - This will log the request's arrival. Once the context manager is - closed, the completion of the request will be logged, and the various - metrics will be updated. - - Args: - servlet_name (str): the name of the servlet which will be - processing this request. This is used in the metrics. - - It is possible to update this afterwards by updating - self.request_metrics.servlet_name. - """ - # TODO: we should probably just move this into render() and finish(), - # to save having to call a separate method. - self._started_processing(servlet_name) - yield - self._finished_processing() - class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 589ee94c66..19f214281e 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import membership, send_event +from synapse.replication.http import federation, membership, send_event REPLICATION_PREFIX = "/_synapse/replication" @@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py new file mode 100644 index 0000000000..2ddd18f73b --- /dev/null +++ b/synapse/replication/http/federation.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.events import FrozenEvent +from synapse.events.snapshot import EventContext +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): + """Handles events newly received from federation, including persisting and + notifying. + + The API looks like: + + POST /_synapse/replication/fed_send_events/:txn_id + + { + "events": [{ + "event": { .. serialized event .. }, + "internal_metadata": { .. serialized internal_metadata .. }, + "rejected_reason": .., // The event.rejected_reason field + "context": { .. serialized event context .. }, + }], + "backfilled": false + """ + + NAME = "fed_send_events" + PATH_ARGS = () + + def __init__(self, hs): + super(ReplicationFederationSendEventsRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.federation_handler = hs.get_handlers().federation_handler + + @staticmethod + @defer.inlineCallbacks + def _serialize_payload(store, event_and_contexts, backfilled): + """ + Args: + store + event_and_contexts (list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether or not the events are the result of + backfilling + """ + event_payloads = [] + for event, context in event_and_contexts: + serialized_context = yield context.serialize(event, store) + + event_payloads.append({ + "event": event.get_pdu_json(), + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + }) + + payload = { + "events": event_payloads, + "backfilled": backfilled, + } + + defer.returnValue(payload) + + @defer.inlineCallbacks + def _handle_request(self, request): + with Measure(self.clock, "repl_fed_send_events_parse"): + content = parse_json_object_from_request(request) + + backfilled = content["backfilled"] + + event_payloads = content["events"] + + event_and_contexts = [] + for event_payload in event_payloads: + event_dict = event_payload["event"] + internal_metadata = event_payload["internal_metadata"] + rejected_reason = event_payload["rejected_reason"] + event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + + context = yield EventContext.deserialize( + self.store, event_payload["context"], + ) + + event_and_contexts.append((event, context)) + + logger.info( + "Got %d events from federation", + len(event_and_contexts), + ) + + yield self.federation_handler.persist_events_and_notify( + event_and_contexts, backfilled, + ) + + defer.returnValue((200, {})) + + +class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): + """Handles EDUs newly received from federation, including persisting and + notifying. + + Request format: + + POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id + + { + "origin": ..., + "content: { ... } + } + """ + + NAME = "fed_send_edu" + PATH_ARGS = ("edu_type",) + + def __init__(self, hs): + super(ReplicationFederationSendEduRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(edu_type, origin, content): + return { + "origin": origin, + "content": content, + } + + @defer.inlineCallbacks + def _handle_request(self, request, edu_type): + with Measure(self.clock, "repl_fed_send_edu_parse"): + content = parse_json_object_from_request(request) + + origin = content["origin"] + edu_content = content["content"] + + logger.info( + "Got %r edu from $s", + edu_type, origin, + ) + + result = yield self.registry.on_edu(edu_type, origin, edu_content) + + defer.returnValue((200, result)) + + +class ReplicationGetQueryRestServlet(ReplicationEndpoint): + """Handle responding to queries from federation. + + Request format: + + POST /_synapse/replication/fed_query/:query_type + + { + "args": { ... } + } + """ + + NAME = "fed_query" + PATH_ARGS = ("query_type",) + + # This is a query, so let's not bother caching + CACHE = False + + def __init__(self, hs): + super(ReplicationGetQueryRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(query_type, args): + """ + Args: + query_type (str) + args (dict): The arguments received for the given query type + """ + return { + "args": args, + } + + @defer.inlineCallbacks + def _handle_request(self, request, query_type): + with Measure(self.clock, "repl_fed_query_parse"): + content = parse_json_object_from_request(request) + + args = content["args"] + + logger.info( + "Got %r query", + query_type, + ) + + result = yield self.registry.on_query(query_type, args) + + defer.returnValue((200, result)) + + +class ReplicationCleanRoomRestServlet(ReplicationEndpoint): + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Request format: + + POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id + + {} + """ + + NAME = "fed_cleanup_room" + PATH_ARGS = ("room_id",) + + def __init__(self, hs): + super(ReplicationCleanRoomRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload(room_id, args): + """ + Args: + room_id (str) + """ + return {} + + @defer.inlineCallbacks + def _handle_request(self, request, room_id): + yield self.store.clean_room_for_join(room_id) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) + ReplicationCleanRoomRestServlet(hs).register(http_server) diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index 9c9a5eadd9..3527beb3c9 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,19 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore from ._base import BaseSlavedStore -class TransactionStore(BaseSlavedStore): - get_destination_retry_timings = TransactionStore.__dict__[ - "get_destination_retry_timings" - ] - _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ - set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__ - _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__ - - prep_send_transaction = DataStore.prep_send_transaction.__func__ - delivered_txn = DataStore.delivered_txn.__func__ +class SlavedTransactionStore(TransactionStore, BaseSlavedStore): + pass diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index fa5989e74e..fcc1091760 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -34,7 +34,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.streams.config import PaginationConfig -from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID +from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID from .base import ClientV1RestServlet, client_path_patterns @@ -384,15 +384,39 @@ class RoomMemberListRestServlet(ClientV1RestServlet): def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) - events = yield self.message_handler.get_state_events( + handler = self.message_handler + + # request the state as of a given event, as identified by a stream token, + # for consistency with /messages etc. + # useful for getting the membership in retrospect as of a given /sync + # response. + at_token_string = parse_string(request, "at") + if at_token_string is None: + at_token = None + else: + at_token = StreamToken.from_string(at_token_string) + + # let you filter down on particular memberships. + # XXX: this may not be the best shape for this API - we could pass in a filter + # instead, except filters aren't currently aware of memberships. + # See https://github.com/matrix-org/matrix-doc/issues/1337 for more details. + membership = parse_string(request, "membership") + not_membership = parse_string(request, "not_membership") + + events = yield handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), + at_token=at_token, + types=[(EventTypes.Member, None)], ) chunk = [] for event in events: - if event["type"] != EventTypes.Member: + if ( + (membership and event['content'].get("membership") != membership) or + (not_membership and event['content'].get("membership") == not_membership) + ): continue chunk.append(event) @@ -401,6 +425,8 @@ class RoomMemberListRestServlet(ClientV1RestServlet): })) +# deprecated in favour of /members?membership=join? +# except it does custom AS logic and has a simpler return format class JoinedRoomMemberListRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$") diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 8aa06faf23..1275baa1ba 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -370,6 +370,7 @@ class SyncRestServlet(RestServlet): ephemeral_events = room.ephemeral result["ephemeral"] = {"events": ephemeral_events} result["unread_notifications"] = room.unread_notifications + result["summary"] = room.summary return result diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 6ac2987b98..29e62bfcdd 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -27,11 +27,22 @@ class VersionsRestServlet(RestServlet): def on_GET(self, request): return (200, { "versions": [ + # XXX: at some point we need to decide whether we need to include + # the previous version numbers, given we've defined r0.3.0 to be + # backwards compatible with r0.2.0. But need to check how + # conscientious we've been in compatibility, and decide whether the + # middle number is the major revision when at 0.X.Y (as opposed to + # X.Y.Z). And we need to decide whether it's fair to make clients + # parse the version string to figure out what's going on. "r0.0.1", "r0.1.0", "r0.2.0", "r0.3.0", - ] + ], + # as per MSC1497: + "unstable_features": { + "m.lazy_load_members": True, + } }) diff --git a/synapse/rest/media/v1/config_resource.py b/synapse/rest/media/v1/config_resource.py new file mode 100644 index 0000000000..d6605b6027 --- /dev/null +++ b/synapse/rest/media/v1/config_resource.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 Will Hunt <will@half-shot.uk> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from twisted.internet import defer +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET + +from synapse.http.server import respond_with_json, wrap_json_request_handler + + +class MediaConfigResource(Resource): + isLeaf = True + + def __init__(self, hs): + Resource.__init__(self) + config = hs.get_config() + self.clock = hs.get_clock() + self.auth = hs.get_auth() + self.limits_dict = { + "m.upload.size": config.max_upload_size, + } + + def render_GET(self, request): + self._async_render_GET(request) + return NOT_DONE_YET + + @wrap_json_request_handler + @defer.inlineCallbacks + def _async_render_GET(self, request): + yield self.auth.get_user_by_req(request) + respond_with_json(request, 200, self.limits_dict) + + def render_OPTIONS(self, request): + respond_with_json(request, 200, {}, send_cors=True) + return NOT_DONE_YET diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 4c589e05e0..241c972070 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -42,6 +42,7 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import is_ascii, random_string from ._base import FileInfo, respond_404, respond_with_responder +from .config_resource import MediaConfigResource from .download_resource import DownloadResource from .filepath import MediaFilePaths from .identicon_resource import IdenticonResource @@ -754,7 +755,6 @@ class MediaRepositoryResource(Resource): Resource.__init__(self) media_repo = hs.get_media_repository() - self.putChild("upload", UploadResource(hs, media_repo)) self.putChild("download", DownloadResource(hs, media_repo)) self.putChild("thumbnail", ThumbnailResource( @@ -765,3 +765,4 @@ class MediaRepositoryResource(Resource): self.putChild("preview_url", PreviewUrlResource( hs, media_repo, media_repo.media_storage, )) + self.putChild("config", MediaConfigResource(hs)) diff --git a/synapse/server.py b/synapse/server.py index 140be9ebe8..26228d8c72 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, + ReplicationFederationHandlerRegistry, ) from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transaction_queue import TransactionQueue @@ -423,7 +424,10 @@ class HomeServer(object): return RoomMemberMasterHandler(self) def build_federation_registry(self): - return FederationHandlerRegistry() + if self.config.worker_app: + return ReplicationFederationHandlerRegistry(self) + else: + return FederationHandlerRegistry() def build_server_notices_manager(self): if self.config.worker_app: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 44f37b4c1e..08dffd774f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1150,17 +1150,16 @@ class SQLBaseStore(object): defer.returnValue(retval) def get_user_count_txn(self, txn): - """Get a total number of registerd users in the users list. + """Get a total number of registered users in the users list. Args: txn : Transaction object Returns: - defer.Deferred: resolves to int + int : number of users """ sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" txn.execute(sql_count) - count = txn.fetchone()[0] - defer.returnValue(count) + return txn.fetchone()[0] def _simple_search_list(self, table, term, col, retcols, desc="_simple_search_list"): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d4aa192a0a..025a7fb6d9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1436,88 +1436,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ) @defer.inlineCallbacks - def have_events_in_timeline(self, event_ids): - """Given a list of event ids, check if we have already processed and - stored them as non outliers. - """ - rows = yield self._simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", - ) - - defer.returnValue(set(r["event_id"] for r in rows)) - - @defer.inlineCallbacks - def have_seen_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Args: - event_ids (iterable[str]): - - Returns: - Deferred[set[str]]: The events we have already seen. - """ - results = set() - - def have_seen_events_txn(txn, chunk): - sql = ( - "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" - % (",".join("?" * len(chunk)), ) - ) - txn.execute(sql, chunk) - for (event_id, ) in txn: - results.add(event_id) - - # break the input up into chunks of 100 - input_iterator = iter(event_ids) - for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), - []): - yield self.runInteraction( - "have_seen_events", - have_seen_events_txn, - chunk, - ) - defer.returnValue(results) - - def get_seen_events_with_rejections(self, event_ids): - """Given a list of event ids, check if we rejected them. - - Args: - event_ids (list[str]) - - Returns: - Deferred[dict[str, str|None): - Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps - to None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction("get_rejection_reasons", f) - - @defer.inlineCallbacks def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. @@ -1993,7 +1911,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore max_depth = max(row[0] for row in rows) if max_depth <= token.topological: - # We need to ensure we don't delete all the events from the datanase + # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) raise SynapseError( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 9b4cfeb899..59822178ff 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging from collections import namedtuple @@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_cache.prefill((original_ev.event_id,), cache_entry) defer.returnValue(cache_entry) + + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + + @defer.inlineCallbacks + def have_seen_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Args: + event_ids (iterable[str]): + + Returns: + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction("get_rejection_reasons", f) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 7e417f811e..06f9a75a97 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -96,7 +96,10 @@ class MonthlyActiveUsersStore(SQLBaseStore): # While Postgres does not require 'LIMIT', but also does not support # negative LIMIT values. So there is no way to write it that both can # support - query_args = [self.hs.config.max_mau_value] + safe_guard = self.hs.config.max_mau_value - len(self.reserved_users) + # Must be greater than zero for postgres + safe_guard = safe_guard if safe_guard > 0 else 0 + query_args = [safe_guard] base_sql = """ DELETE FROM monthly_active_users diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3147fb6827..3378fc77d1 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple( class RoomWorkerStore(SQLBaseStore): + def get_room(self, room_id): + """Retrieve a room. + + Args: + room_id (str): The ID of the room to retrieve. + Returns: + A namedtuple containing the room information, or an empty list. + """ + return self._simple_select_one( + table="rooms", + keyvalues={"room_id": room_id}, + retcols=("room_id", "is_public", "creator"), + desc="get_room", + allow_none=True, + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", @@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore): logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") - def get_room(self, room_id): - """Retrieve a room. - - Args: - room_id (str): The ID of the room to retrieve. - Returns: - A namedtuple containing the room information, or an empty list. - """ - return self._simple_select_one( - table="rooms", - keyvalues={"room_id": room_id}, - retcols=("room_id", "is_public", "creator"), - desc="get_room", - allow_none=True, - ) - @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): def set_room_is_public_txn(txn, next_id): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 17b14d464b..dd03c4168b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -116,6 +116,69 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): _get_current_state_ids_txn, ) + # FIXME: how should this be cached? + def get_filtered_current_state_ids(self, room_id, types, filtered_types=None): + """Get the current state event of a given type for a room based on the + current_state_events table. This may not be as up-to-date as the result + of doing a fresh state resolution as per state_handler.get_current_state + Args: + room_id (str) + types (list[(Str, (Str|None))]): List of (type, state_key) tuples + which are used to filter the state fetched. `state_key` may be + None, which matches any `state_key` + filtered_types (list[Str]|None): List of types to apply the above filter to. + Returns: + deferred: dict of (type, state_key) -> event + """ + + include_other_types = False if filtered_types is None else True + + def _get_filtered_current_state_ids_txn(txn): + results = {} + sql = """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? %s""" + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) if state_key is not None else ( + "AND type = ?", + (etype,) + ) + for etype, state_key in types + ] + + if include_other_types: + unique_types = set(filtered_types) + clause_to_args.append( + ( + "AND type <> ? " * len(unique_types), + list(unique_types) + ) + ) + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + for where_clause, where_args in clause_to_args: + args = [room_id] + args.extend(where_args) + txn.execute(sql % (where_clause,), args) + for row in txn: + typ, state_key, event_id = row + key = (intern_string(typ), intern_string(state_key)) + results[key] = event_id + return results + + return self.runInteraction( + "get_filtered_current_state_ids", + _get_filtered_current_state_ids_txn, + ) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between @@ -389,8 +452,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): If None, `types` filtering is applied to all events. Returns: - deferred: A list of dicts corresponding to the event_ids given. - The dicts are mappings from (type, state_key) -> state_events + deferred: A dict of (event_id) -> (type, state_key) -> [state_events] """ event_to_groups = yield self._get_state_group_for_events( event_ids, @@ -418,7 +480,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ - Get the state dicts corresponding to a list of events + Get the state dicts corresponding to a list of events, containing the event_ids + of the state events (as opposed to the events themselves) Args: event_ids(list(str)): events whose state should be returned @@ -431,7 +494,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): If None, `types` filtering is applied to all events. Returns: - A deferred dict from event_id -> (type, state_key) -> state_event + A deferred dict from event_id -> (type, state_key) -> event_id """ event_to_groups = yield self._get_state_group_for_events( event_ids, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b9f2b74ac6..4c296d72c0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -348,7 +348,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): end_token (str): The stream token representing now. Returns: - Deferred[tuple[list[FrozenEvent], str]]: Returns a list of + Deferred[tuple[list[FrozenEvent], str]]: Returns a list of events and a token pointing to the start of the returned events. The events returned are in ascending order. @@ -379,7 +379,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): end_token (str): The stream token representing now. Returns: - Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of + Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of _EventDictReturn and a token pointing to the start of the returned events. The events returned are in ascending order. |