diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/api/errors.py | 1 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 19 | ||||
-rw-r--r-- | synapse/config/server.py | 10 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 1 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 19 | ||||
-rw-r--r-- | synapse/handlers/events.py | 25 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 204 | ||||
-rw-r--r-- | synapse/handlers/register.py | 19 | ||||
-rw-r--r-- | synapse/rest/client/v1/directory.py | 4 | ||||
-rw-r--r-- | synapse/rest/client/v1/events.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 2 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 26 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 1 | ||||
-rw-r--r-- | synapse/storage/events.py | 14 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 20 | ||||
-rw-r--r-- | synapse/storage/schema/delta/50/make_event_content_nullable.py | 2 |
17 files changed, 257 insertions, 114 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 5c0f2f83aa..1810cb6fcd 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -17,4 +17,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.33.0" +__version__ = "0.33.1" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 6074df292f..14f5540280 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -55,6 +55,7 @@ class Codes(object): SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" + MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED" class CodeMessageException(RuntimeError): diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 57b815d777..fba51c26e8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,8 @@ import sys from six import iteritems +from prometheus_client import Gauge + from twisted.application import service from twisted.internet import defer, reactor from twisted.web.resource import EncodingResourceWrapper, NoResource @@ -300,6 +302,11 @@ class SynapseHomeServer(HomeServer): quit_with_error(e.message) +# Gauges to expose monthly active user control metrics +current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU") +max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit") + + def setup(config_options): """ Args: @@ -512,6 +519,18 @@ def run(hs): # table will decrease clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + @defer.inlineCallbacks + def generate_monthly_active_users(): + count = 0 + if hs.config.limit_usage_by_mau: + count = yield hs.get_datastore().count_monthly_users() + current_mau_gauge.set(float(count)) + max_mau_value_gauge.set(float(hs.config.max_mau_value)) + + generate_monthly_active_users() + if hs.config.limit_usage_by_mau: + clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) diff --git a/synapse/config/server.py b/synapse/config/server.py index 18102656b0..6a471a0a5e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -67,6 +67,14 @@ class ServerConfig(Config): "block_non_admin_invites", False, ) + # Options to control access by tracking MAU + self.limit_usage_by_mau = config.get("limit_usage_by_mau", False) + if self.limit_usage_by_mau: + self.max_mau_value = config.get( + "max_mau_value", 0, + ) + else: + self.max_mau_value = 0 # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None federation_domain_whitelist = config.get( @@ -209,6 +217,8 @@ class ServerConfig(Config): # different cores. See # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/. # + # This setting requires the affinity package to be installed! + # # cpu_affinity: 0xFFFFFFFF # Whether to serve a web client from the HTTP/HTTPS root resource. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 657935d1ac..bf89d568af 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -426,6 +426,7 @@ class FederationServer(FederationBase): ret = yield self.handler.on_query_auth( origin, event_id, + room_id, signed_auth, content.get("rejects", []), content.get("missing", []), diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 5d03bfa5f7..184eef09d0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -520,6 +520,7 @@ class AuthHandler(BaseHandler): """ logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) + yield self._check_mau_limits() # the device *should* have been registered before we got here; however, # it's possible we raced against a DELETE operation. The thing we @@ -731,15 +732,18 @@ class AuthHandler(BaseHandler): device_id) defer.returnValue(access_token) + @defer.inlineCallbacks def validate_short_term_login_token_and_get_user_id(self, login_token): + yield self._check_mau_limits() auth_api = self.hs.get_auth() + user_id = None try: macaroon = pymacaroons.Macaroon.deserialize(login_token) user_id = auth_api.get_user_id_from_macaroon(macaroon) auth_api.validate_macaroon(macaroon, "login", True, user_id) - return user_id except Exception: raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) + defer.returnValue(user_id) @defer.inlineCallbacks def delete_access_token(self, access_token): @@ -903,6 +907,19 @@ class AuthHandler(BaseHandler): else: return defer.succeed(False) + @defer.inlineCallbacks + def _check_mau_limits(self): + """ + Ensure that if mau blocking is enabled that invalid users cannot + log in. + """ + if self.hs.config.limit_usage_by_mau is True: + current_mau = yield self.store.count_monthly_users() + if current_mau >= self.hs.config.max_mau_value: + raise AuthError( + 403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED + ) + @attr.s class MacaroonGenerator(object): diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index c3f2d7feff..f772e62c28 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -19,10 +19,12 @@ import random from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.api.errors import AuthError from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.types import UserID from synapse.util.logutils import log_function +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -129,11 +131,13 @@ class EventStreamHandler(BaseHandler): class EventHandler(BaseHandler): @defer.inlineCallbacks - def get_event(self, user, event_id): + def get_event(self, user, room_id, event_id): """Retrieve a single specified event. Args: user (synapse.types.UserID): The user requesting the event + room_id (str|None): The expected room id. We'll return None if the + event's room does not match. event_id (str): The event ID to obtain. Returns: dict: An event, or None if there is no event matching this ID. @@ -142,13 +146,26 @@ class EventHandler(BaseHandler): AuthError if the user does not have the rights to inspect this event. """ - event = yield self.store.get_event(event_id) + event = yield self.store.get_event(event_id, check_room_id=room_id) if not event: defer.returnValue(None) return - if hasattr(event, "room_id"): - yield self.auth.check_joined_room(event.room_id, user.to_string()) + users = yield self.store.get_users_in_room(event.room_id) + is_peeking = user.to_string() not in users + + filtered = yield filter_events_for_client( + self.store, + user.to_string(), + [event], + is_peeking=is_peeking + ) + + if not filtered: + raise AuthError( + 403, + "You don't have permission to access that event." + ) defer.returnValue(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 91d8def08b..533b82c783 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -400,7 +400,7 @@ class FederationHandler(BaseHandler): ) try: - event_stream_id, max_stream_id = yield self._persist_auth_tree( + yield self._persist_auth_tree( origin, auth_chain, state, event ) except AuthError as e: @@ -444,7 +444,7 @@ class FederationHandler(BaseHandler): yield self._handle_new_events(origin, event_infos) try: - context, event_stream_id, max_stream_id = yield self._handle_new_event( + context = yield self._handle_new_event( origin, event, state=state, @@ -469,17 +469,6 @@ class FederationHandler(BaseHandler): except StoreError: logger.exception("Failed to store room.") - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: # Only fire user_joined_room if the user has acutally @@ -501,7 +490,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield user_joined_room(self.distributor, user, event.room_id) + yield self.user_joined_room(user, event.room_id) @log_function @defer.inlineCallbacks @@ -942,7 +931,7 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] - yield self.store.clean_room_for_join(room_id) + yield self._clean_room_for_join(room_id) handled_events = set() @@ -981,15 +970,10 @@ class FederationHandler(BaseHandler): # FIXME pass - event_stream_id, max_stream_id = yield self._persist_auth_tree( + yield self._persist_auth_tree( origin, auth_chain, state, event ) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[joinee] - ) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -1084,7 +1068,7 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin - context, event_stream_id, max_stream_id = yield self._handle_new_event( + context = yield self._handle_new_event( origin, event ) @@ -1094,20 +1078,10 @@ class FederationHandler(BaseHandler): event.signatures, ) - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield user_joined_room(self.distributor, user, event.room_id) + yield self.user_joined_room(user, event.room_id) prev_state_ids = yield context.get_prev_state_ids(self.store) @@ -1176,17 +1150,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - ) - - target_user = UserID.from_string(event.state_key) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + yield self._persist_events([(event, context)]) defer.returnValue(event) @@ -1217,17 +1181,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - ) - - target_user = UserID.from_string(event.state_key) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + yield self._persist_events([(event, context)]) defer.returnValue(event) @@ -1318,7 +1272,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context, event_stream_id, max_stream_id = yield self._handle_new_event( + yield self._handle_new_event( origin, event ) @@ -1328,22 +1282,17 @@ class FederationHandler(BaseHandler): event.signatures, ) - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) - defer.returnValue(None) @defer.inlineCallbacks def get_state_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ + + event = yield self.store.get_event( + event_id, allow_none=False, check_room_id=room_id, + ) + state_groups = yield self.store.get_state_groups( room_id, [event_id] ) @@ -1354,8 +1303,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in state } - event = yield self.store.get_event(event_id) - if event and event.is_state(): + if event.is_state(): # Get previous state if "replaces_state" in event.unsigned: prev_id = event.unsigned["replaces_state"] @@ -1374,6 +1322,10 @@ class FederationHandler(BaseHandler): def get_state_ids_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ + event = yield self.store.get_event( + event_id, allow_none=False, check_room_id=room_id, + ) + state_groups = yield self.store.get_state_groups_ids( room_id, [event_id] ) @@ -1382,8 +1334,7 @@ class FederationHandler(BaseHandler): _, state = state_groups.items().pop() results = state - event = yield self.store.get_event(event_id) - if event and event.is_state(): + if event.is_state(): # Get previous state if "replaces_state" in event.unsigned: prev_id = event.unsigned["replaces_state"] @@ -1472,9 +1423,8 @@ class FederationHandler(BaseHandler): event, context ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, + yield self._persist_events( + [(event, context)], backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. @@ -1487,15 +1437,7 @@ class FederationHandler(BaseHandler): six.reraise(tp, value, tb) - if not backfilled: - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - logcontext.run_in_background( - self.pusher_pool.on_new_notifications, - event_stream_id, max_stream_id, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): @@ -1503,6 +1445,8 @@ class FederationHandler(BaseHandler): should not depend on one another, e.g. this should be used to persist a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. + + Notifies about the events where appropriate. """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ @@ -1517,7 +1461,7 @@ class FederationHandler(BaseHandler): ], consumeErrors=True, )) - yield self.store.persist_events( + yield self._persist_events( [ (ev_info["event"], context) for ev_info, context in zip(event_infos, contexts) @@ -1529,7 +1473,8 @@ class FederationHandler(BaseHandler): def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. - Persists the event seperately. + Persists the event separately. Notifies about the persisted events + where appropriate. Will attempt to fetch missing auth events. @@ -1540,8 +1485,7 @@ class FederationHandler(BaseHandler): event (Event) Returns: - 2-tuple of (event_stream_id, max_stream_id) from the persist_event - call for `event` + Deferred """ events_to_context = {} for e in itertools.chain(auth_events, state): @@ -1605,7 +1549,7 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self.store.persist_events( + yield self._persist_events( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) @@ -1616,12 +1560,10 @@ class FederationHandler(BaseHandler): event, old_state=state ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, new_event_context, + yield self._persist_events( + [(event, new_event_context)], ) - defer.returnValue((event_stream_id, max_stream_id)) - @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): """ @@ -1678,8 +1620,19 @@ class FederationHandler(BaseHandler): defer.returnValue(context) @defer.inlineCallbacks - def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, + def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects, missing): + in_room = yield self.auth.check_host_in_room( + room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + + event = yield self.store.get_event( + event_id, allow_none=False, check_room_id=room_id + ) + # Just go through and process each event in `remote_auth_chain`. We # don't want to fall into the trap of `missing` being wrong. for e in remote_auth_chain: @@ -1689,7 +1642,6 @@ class FederationHandler(BaseHandler): pass # Now get the current auth_chain for the event. - event = yield self.store.get_event(event_id) local_auth_chain = yield self.store.get_auth_chain( [auth_id for auth_id, _ in event.auth_events], include_given=True @@ -2347,3 +2299,69 @@ class FederationHandler(BaseHandler): ) if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") + + @defer.inlineCallbacks + def _persist_events(self, event_and_contexts, backfilled=False): + """Persists events and tells the notifier/pushers about them, if + necessary. + + Args: + event_and_contexts(list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether these events are a result of + backfilling or not + + Returns: + Deferred + """ + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) + + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) + + def _notify_persisted_event(self, event, max_stream_id): + """Checks to see if notifier/pushers should be notified about the + event or not. + + Args: + event (FrozenEvent) + max_stream_id (int): The max_stream_id returned by persist_events + """ + + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + event_stream_id = event.internal_metadata.stream_ordering + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, + ) + + def _clean_room_for_join(self, room_id): + return self.store.clean_room_for_join(room_id) + + def user_joined_room(self, user, room_id): + """Called when a new user has joined the room + """ + return user_joined_room(self.distributor, user, room_id) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 234f8e8019..289704b241 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -45,7 +45,7 @@ class RegistrationHandler(BaseHandler): hs (synapse.server.HomeServer): """ super(RegistrationHandler, self).__init__(hs) - + self.hs = hs self.auth = hs.get_auth() self._auth_handler = hs.get_auth_handler() self.profile_handler = hs.get_profile_handler() @@ -144,6 +144,7 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ + yield self._check_mau_limits() password_hash = None if password: password_hash = yield self.auth_handler().hash(password) @@ -288,6 +289,7 @@ class RegistrationHandler(BaseHandler): 400, "User ID can only contain characters a-z, 0-9, or '=_-./'", ) + yield self._check_mau_limits() user = UserID(localpart, self.hs.hostname) user_id = user.to_string() @@ -437,7 +439,7 @@ class RegistrationHandler(BaseHandler): """ if localpart is None: raise SynapseError(400, "Request must include user id") - + yield self._check_mau_limits() need_register = True try: @@ -531,3 +533,16 @@ class RegistrationHandler(BaseHandler): remote_room_hosts=remote_room_hosts, action="join", ) + + @defer.inlineCallbacks + def _check_mau_limits(self): + """ + Do not accept registrations if monthly active user limits exceeded + and limiting is enabled + """ + if self.hs.config.limit_usage_by_mau is True: + current_mau = yield self.store.count_monthly_users() + if current_mau >= self.hs.config.max_mau_value: + raise RegistrationError( + 403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED + ) diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 69dcd618cb..97733f3026 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -18,7 +18,7 @@ import logging from twisted.internet import defer -from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.http.servlet import parse_json_object_from_request from synapse.types import RoomAlias @@ -159,7 +159,7 @@ class ClientDirectoryListServer(ClientV1RestServlet): def on_GET(self, request, room_id): room = yield self.store.get_room(room_id) if room is None: - raise SynapseError(400, "Unknown room") + raise NotFoundError("Unknown room") defer.returnValue((200, { "visibility": "public" if room["is_public"] else "private" diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index b70c9c2806..0f3a2e8b51 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -88,7 +88,7 @@ class EventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, event_id): requester = yield self.auth.get_user_by_req(request) - event = yield self.event_handler.get_event(requester.user, event_id) + event = yield self.event_handler.get_event(requester.user, None, event_id) time_now = self.clock.time_msec() if event: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 13c331550b..fa5989e74e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -506,7 +506,7 @@ class RoomEventServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): requester = yield self.auth.get_user_by_req(request) - event = yield self.event_handler.get_event(requester.user, event_id) + event = yield self.event_handler.get_event(requester.user, room_id, event_id) time_now = self.clock.time_msec() if event: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3bd63cd195..134e4a80f1 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -94,6 +94,7 @@ class DataStore(RoomMemberStore, RoomStore, self._clock = hs.get_clock() self.database_engine = hs.database_engine + self.db_conn = db_conn self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", extra_tables=[("local_invites", "stream_id")] @@ -266,6 +267,31 @@ class DataStore(RoomMemberStore, RoomStore, return self.runInteraction("count_users", _count_users) + def count_monthly_users(self): + """Counts the number of users who used this homeserver in the last 30 days + + This method should be refactored with count_daily_users - the only + reason not to is waiting on definition of mau + + Returns: + Defered[int] + """ + def _count_monthly_users(txn): + thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT user_id FROM user_ips + WHERE last_seen > ? + GROUP BY user_id + ) u + """ + + txn.execute(sql, (thirty_days_ago,)) + count, = txn.fetchone() + return count + + return self.runInteraction("count_monthly_users", _count_monthly_users) + def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8bd35df119..24345b20a6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -343,6 +343,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, table="events", keyvalues={ "event_id": event_id, + "room_id": room_id, }, retcol="depth", allow_none=True, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 61223da1a5..e8e5a0fe44 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -241,12 +241,18 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore self._state_resolution_handler = hs.get_state_resolution_handler() + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) - backfilled: ? + backfilled (bool): Whether the results are retrieved from federation + via backfill or not. Used to determine if they're "new" events + which might update the current state etc. + + Returns: + Deferred[int]: the stream ordering of the latest persisted event """ partitioned = {} for event, ctx in events_and_contexts: @@ -263,10 +269,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore for room_id in partitioned: self._maybe_start_persisting(room_id) - return make_deferred_yieldable( + yield make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) + max_persisted_id = yield self._stream_id_gen.get_current_token() + + defer.returnValue(max_persisted_id) + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index f28239a808..9b4cfeb899 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -19,7 +19,7 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.api.errors import NotFoundError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent @@ -77,7 +77,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, - allow_none=False): + allow_none=False, check_room_id=None): """Get an event from the database by event_id. Args: @@ -88,7 +88,9 @@ class EventsWorkerStore(SQLBaseStore): include the previous states content in the unsigned field. allow_rejected (bool): If True return rejected events. allow_none (bool): If True, return None if no event found, if - False throw an exception. + False throw a NotFoundError + check_room_id (str|None): if not None, check the room of the found event. + If there is a mismatch, behave as per allow_none. Returns: Deferred : A FrozenEvent. @@ -100,10 +102,16 @@ class EventsWorkerStore(SQLBaseStore): allow_rejected=allow_rejected, ) - if not events and not allow_none: - raise SynapseError(404, "Could not find event %s" % (event_id,)) + event = events[0] if events else None - defer.returnValue(events[0] if events else None) + if event is not None and check_room_id is not None: + if event.room_id != check_room_id: + event = None + + if event is None and not allow_none: + raise NotFoundError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) @defer.inlineCallbacks def get_events(self, event_ids, check_redacted=True, diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py index 7d27342e39..6dd467b6c5 100644 --- a/synapse/storage/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py @@ -88,5 +88,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs): "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'", (sql, ), ) - cur.execute("PRAGMA schema_version=%i" % (oldver+1,)) + cur.execute("PRAGMA schema_version=%i" % (oldver + 1,)) cur.execute("PRAGMA writable_schema=OFF") |