diff options
Diffstat (limited to 'synapse')
26 files changed, 453 insertions, 107 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 2474a1453b..007a0998a7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -120,6 +120,24 @@ class Auth(object): return allowed self.check_event_sender_in_room(event, auth_events) + + # Special case to allow m.room.third_party_invite events wherever + # a user is allowed to issue invites. Fixes + # https://github.com/vector-im/vector-web/issues/1208 hopefully + if event.type == EventTypes.ThirdPartyInvite: + user_level = self._get_user_power_level(event.user_id, auth_events) + invite_level = self._get_named_level(auth_events, "invite", 0) + + if user_level < invite_level: + raise AuthError( + 403, ( + "You cannot issue a third party invite for %s." % + (event.content.display_name,) + ) + ) + else: + return True + self._can_send_event(event, auth_events) if event.type == EventTypes.PowerLevels: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 47a4e9f864..9afc8fd754 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -56,22 +56,22 @@ import logging logger = logging.getLogger(__name__) -class AppServiceScheduler(object): +class ApplicationServiceScheduler(object): """ Public facing API for this module. Does the required DI to tie the components together. This also serves as the "event_pool", which in this case is a simple array. """ - def __init__(self, clock, store, as_api): - self.clock = clock - self.store = store - self.as_api = as_api + def __init__(self, hs): + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.as_api = hs.get_application_service_api() def create_recoverer(service, callback): - return _Recoverer(clock, store, as_api, service, callback) + return _Recoverer(self.clock, self.store, self.as_api, service, callback) self.txn_ctrl = _TransactionController( - clock, store, as_api, create_recoverer + self.clock, self.store, self.as_api, create_recoverer ) self.queuer = _ServiceQueuer(self.txn_ctrl) diff --git a/synapse/config/server.py b/synapse/config/server.py index 0b5f462e44..c2d8f8a52f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -29,6 +29,7 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) self.public_baseurl = config.get("public_baseurl") + self.secondary_directory_servers = config.get("secondary_directory_servers", []) if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': @@ -156,6 +157,15 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # A list of other Home Servers to fetch the public room directory from + # and include in the public room directory of this home server + # This is a temporary stopgap solution to populate new server with a + # list of rooms until there exists a good solution of a decentralized + # room directory. + # secondary_directory_servers: + # - matrix.org + # - vector.im + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 37ee469fa2..d835c1b038 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -551,6 +552,25 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks + def get_public_rooms(self, destinations): + results_by_server = {} + + @defer.inlineCallbacks + def _get_result(s): + if s == self.server_name: + defer.returnValue() + + try: + result = yield self.transport_layer.get_public_rooms(s) + results_by_server[s] = result + except: + logger.exception("Error getting room list from server %r", s) + + yield concurrently_execute(_get_result, destinations, 3) + + defer.returnValue(results_by_server) + + @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): """ Params: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd2841c4db..ebb698e278 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -226,6 +226,18 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function + def get_public_rooms(self, remote_server): + path = PREFIX + "/publicRooms" + + response = yield self.client.get_json( + destination=remote_server, + path=path, + ) + + defer.returnValue(response) + + @defer.inlineCallbacks + @log_function def exchange_third_party_invite(self, destination, room_id, event_dict): path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5b6c7d11dd..a1a334955f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,10 +134,12 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name): + def __init__(self, handler, authenticator, ratelimiter, server_name, + room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter + self.room_list_handler = room_list_handler def _wrap(self, code): authenticator = self.authenticator @@ -492,6 +494,50 @@ class OpenIdUserInfo(BaseFederationServlet): return code +class PublicRoomList(BaseFederationServlet): + """ + Fetch the public room list for this server. + + This API returns information in the same format as /publicRooms on the + client API, but will only ever include local public rooms and hence is + intended for consumption by other home servers. + + GET /publicRooms HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "chunk": [ + { + "aliases": [ + "#test:localhost" + ], + "guest_can_join": false, + "name": "test room", + "num_joined_members": 3, + "room_id": "!whkydVegtvatLfXmPN:localhost", + "world_readable": false + } + ], + "end": "END", + "start": "START" + } + """ + + PATH = "/publicRooms" + + @defer.inlineCallbacks + def on_GET(self, request): + data = yield self.room_list_handler.get_local_public_room_list() + defer.returnValue((200, data)) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -513,6 +559,7 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, OpenIdUserInfo, + PublicRoomList, ) @@ -523,4 +570,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter): authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, + room_list_handler=hs.get_room_list_handler(), ).register(resource) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 9442ae6f1d..c0069e23d6 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,11 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.appservice.scheduler import AppServiceScheduler -from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( - RoomCreationHandler, RoomListHandler, RoomContextHandler, + RoomCreationHandler, RoomContextHandler, ) from .room_member import RoomMemberHandler from .message import MessageHandler @@ -26,7 +24,6 @@ from .federation import FederationHandler from .profile import ProfileHandler from .directory import DirectoryHandler from .admin import AdminHandler -from .appservice import ApplicationServicesHandler from .auth import AuthHandler from .identity import IdentityHandler from .receipts import ReceiptsHandler @@ -50,18 +47,9 @@ class Handlers(object): self.event_handler = EventHandler(hs) self.federation_handler = FederationHandler(hs) self.profile_handler = ProfileHandler(hs) - self.room_list_handler = RoomListHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) self.receipts_handler = ReceiptsHandler(hs) - asapi = ApplicationServiceApi(hs) - self.appservice_handler = ApplicationServicesHandler( - hs, asapi, AppServiceScheduler( - clock=hs.get_clock(), - store=hs.get_datastore(), - as_api=asapi - ) - ) self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 75fc74c797..051ccdb380 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService -from synapse.types import UserID import logging @@ -35,16 +34,13 @@ def log_failure(failure): ) -# NB: Purposefully not inheriting BaseHandler since that contains way too much -# setup code which this handler does not need or use. This makes testing a lot -# easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api, appservice_scheduler): + def __init__(self, hs): self.store = hs.get_datastore() - self.hs = hs - self.appservice_api = appservice_api - self.scheduler = appservice_scheduler + self.is_mine_id = hs.is_mine_id + self.appservice_api = hs.get_application_service_api() + self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False @defer.inlineCallbacks @@ -169,8 +165,7 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def _is_unknown_user(self, user_id): - user = UserID.from_string(user_id) - if not self.hs.is_mine(user): + if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. defer.returnValue(False) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 68d0d78fc6..26c865e171 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -18,7 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.constants import LoginType from synapse.types import UserID -from synapse.api.errors import AuthError, LoginError, Codes +from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor from twisted.web.client import PartialDownloadError @@ -563,7 +563,12 @@ class AuthHandler(BaseHandler): except_access_token_ids = [requester.access_token_id] if requester else [] - yield self.store.user_set_password_hash(user_id, password_hash) + try: + yield self.store.user_set_password_hash(user_id, password_hash) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) + raise e yield self.store.user_delete_access_tokens( user_id, except_access_token_ids ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8eeb225811..4bea7f2b19 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler): super(DirectoryHandler, self).__init__(hs) self.state = hs.get_state_handler() + self.appservice_handler = hs.get_application_service_handler() self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler): ) if not result: # Query AS to see if it exists - as_handler = self.hs.get_handlers().appservice_handler + as_handler = self.appservice_handler result = yield as_handler.query_room_alias_exists(room_alias) defer.returnValue(result) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5883b9111e..16f33f8371 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -16,7 +16,7 @@ """Contains functions for registering clients.""" from twisted.internet import defer -from synapse.types import UserID +from synapse.types import UserID, Requester from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) @@ -360,7 +360,8 @@ class RegistrationHandler(BaseHandler): @defer.inlineCallbacks def get_or_create_user(self, localpart, displayname, duration_seconds): - """Creates a new user or returns an access token for an existing one + """Creates a new user if the user does not exist, + else revokes all previous access tokens and generates a new one. Args: localpart : The local part of the user ID to register. If None, @@ -399,14 +400,14 @@ class RegistrationHandler(BaseHandler): yield registered_user(self.distributor, user) else: - yield self.store.flush_user(user_id=user_id) + yield self.store.user_delete_access_tokens(user_id=user_id) yield self.store.add_access_token_to_user(user_id=user_id, token=token) if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler yield profile_handler.set_displayname( - user, user, displayname + user, Requester(user, token, False), displayname ) defer.returnValue((user_id, token)) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d63b3c513..9fd34588dd 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -36,6 +36,8 @@ import string logger = logging.getLogger(__name__) +REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 + id_server_scheme = "https://" @@ -344,8 +346,14 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache() + self.remote_list_request_cache = ResponseCache() + self.remote_list_cache = {} + self.fetch_looping_call = hs.get_clock().looping_call( + self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL + ) + self.fetch_all_remote_lists() - def get_public_room_list(self): + def get_local_public_room_list(self): result = self.response_cache.get(()) if not result: result = self.response_cache.set((), self._get_public_room_list()) @@ -427,6 +435,55 @@ class RoomListHandler(BaseHandler): # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) + @defer.inlineCallbacks + def fetch_all_remote_lists(self): + deferred = self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + self.remote_list_request_cache.set((), deferred) + self.remote_list_cache = yield deferred + + @defer.inlineCallbacks + def get_aggregated_public_room_list(self): + """ + Get the public room list from this server and the servers + specified in the secondary_directory_servers config option. + XXX: Pagination... + """ + # We return the results from out cache which is updated by a looping call, + # unless we're missing a cache entry, in which case wait for the result + # of the fetch if there's one in progress. If not, omit that server. + wait = False + for s in self.hs.config.secondary_directory_servers: + if s not in self.remote_list_cache: + logger.warn("No cached room list from %s: waiting for fetch", s) + wait = True + break + + if wait and self.remote_list_request_cache.get(()): + yield self.remote_list_request_cache.get(()) + + public_rooms = yield self.get_local_public_room_list() + + # keep track of which room IDs we've seen so we can de-dup + room_ids = set() + + # tag all the ones in our list with our server name. + # Also add the them to the de-deping set + for room in public_rooms['chunk']: + room["server_name"] = self.hs.hostname + room_ids.add(room["room_id"]) + + # Now add the results from federation + for server_name, server_result in self.remote_list_cache.items(): + for room in server_result["chunk"]: + if room["room_id"] not in room_ids: + room["server_name"] = server_name + public_rooms["chunk"].append(room) + room_ids.add(room["room_id"]) + + defer.returnValue(public_rooms) + class RoomContextHandler(BaseHandler): @defer.inlineCallbacks diff --git a/synapse/notifier.py b/synapse/notifier.py index 33b79c0ec7..cbec4d30ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -140,8 +140,6 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs): - self.hs = hs - self.user_to_user_stream = {} self.room_to_user_streams = {} self.appservice_to_user_streams = {} @@ -151,6 +149,8 @@ class Notifier(object): self.pending_new_room_events = [] self.clock = hs.get_clock() + self.appservice_handler = hs.get_application_service_handler() + self.state_handler = hs.get_state_handler() hs.get_distributor().observe( "user_joined_room", self._user_joined_room @@ -232,9 +232,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - self.hs.get_handlers().appservice_handler.notify_interested_services( - event - ) + self.appservice_handler.notify_interested_services(event) app_streams = set() @@ -449,7 +447,7 @@ class Notifier(object): @defer.inlineCallbacks def _is_world_readable(self, room_id): - state = yield self.hs.get_state_handler().get_current_state( + state = yield self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 25e13b3423..25f2fb9da4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) def decode_rule_json(rule): + rule = dict(rule) rule['conditions'] = json.loads(rule['conditions']) rule['actions'] = json.loads(rule['actions']) return rule @@ -39,6 +40,8 @@ def _get_rules(room_id, user_ids, store): rules_by_user = yield store.bulk_get_push_rules(user_ids) rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) + rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + rules_by_user = { uid: list_with_base_rules([ decode_rule_json(rule_list) @@ -51,11 +54,10 @@ def _get_rules(room_id, user_ids, store): # fetch disabled rules, but this won't account for any server default # rules the user has disabled, so we need to do this too. for uid in user_ids: - if uid not in rules_enabled_by_user: + user_enabled_map = rules_enabled_by_user.get(uid) + if not user_enabled_map: continue - user_enabled_map = rules_enabled_by_user[uid] - for i, rule in enumerate(rules_by_user[uid]): rule_id = rule['rule_id'] diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index b4b728adc5..e38ed02006 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -32,12 +32,19 @@ DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000 # Each room maintains its own throttle counter, but each new mail notification # sends the pending notifications for all rooms. THROTTLE_START_MS = 10 * 60 * 1000 -THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # (2 * 60 * 1000) * (2 ** 11) # ~3 days -THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours +THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h +# THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours +THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day # If no event triggers a notification for this long after the previous, # the throttle is released. -THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days +# 12 hours - a gap of 12 hours in conversation is surely enough to merit a new +# notification when things get going again... +THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000) + +# does each email include all unread notifs, or just the ones which have happened +# since the last mail? +INCLUDE_ALL_UNREAD_NOTIFS = True class EmailPusher(object): @@ -65,7 +72,12 @@ class EmailPusher(object): self.processing = False if self.hs.config.email_enable_notifs: - self.mailer = Mailer(self.hs) + if 'data' in pusherdict and 'brand' in pusherdict['data']: + app_name = pusherdict['data']['brand'] + else: + app_name = self.hs.config.email_app_name + + self.mailer = Mailer(self.hs, app_name) else: self.mailer = None @@ -126,8 +138,9 @@ class EmailPusher(object): up logging, measures and guards against multiple instances of it being run. """ + start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( - self.user_id, self.last_stream_ordering, self.max_stream_ordering + self.user_id, start, self.max_stream_ordering ) soonest_due_at = None @@ -150,7 +163,6 @@ class EmailPusher(object): # we then consider all previously outstanding notifications # to be delivered. - # debugging: reason = { 'room_id': push_action['room_id'], 'now': self.clock.time_msec(), @@ -165,9 +177,12 @@ class EmailPusher(object): yield self.save_last_stream_ordering_and_success(max([ ea['stream_ordering'] for ea in unprocessed ])) - yield self.sent_notif_update_throttle( - push_action['room_id'], push_action - ) + + # we update the throttle on all the possible unprocessed push actions + for ea in unprocessed: + yield self.sent_notif_update_throttle( + ea['room_id'], ea + ) break else: if soonest_due_at is None or should_notify_at < soonest_due_at: diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index c2c2ca3fa7..0e9d8ccb53 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -44,8 +44,11 @@ MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \ "in the %s room..." MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." -MESSAGES_IN_ROOM = "There are some messages on %(app)s for you in the %(room)s room..." -MESSAGES_IN_ROOMS = "Here are some messages on %(app)s you may have missed..." +MESSAGES_IN_ROOM = "You have messages on %(app)s in the %(room)s room..." +MESSAGES_IN_ROOM_AND_OTHERS = \ + "You have messages on %(app)s in the %(room)s room and others..." +MESSAGES_FROM_PERSON_AND_OTHERS = \ + "You have messages on %(app)s from %(person)s and others..." INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \ "%(room)s room on %(app)s..." INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." @@ -75,12 +78,12 @@ ALLOWED_ATTRS = { class Mailer(object): - def __init__(self, hs): + def __init__(self, hs, app_name): self.hs = hs self.store = self.hs.get_datastore() self.state_handler = self.hs.get_state_handler() loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) - self.app_name = self.hs.config.email_app_name + self.app_name = app_name env = jinja2.Environment(loader=loader) env.filters["format_ts"] = format_ts_filter env.filters["mxc_to_http"] = self.mxc_to_http_filter @@ -119,6 +122,8 @@ class Mailer(object): user_display_name = yield self.store.get_profile_displayname( UserID.from_string(user_id).localpart ) + if user_display_name is None: + user_display_name = user_id except StoreError: user_display_name = user_id @@ -128,9 +133,14 @@ class Mailer(object): state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email - # notifs are much realtime than sync so we can afford to wait a bit. + # notifs are much less realtime than sync so we can afford to wait a bit. yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) + # actually sort our so-called rooms_in_order list, most recent room first + rooms_in_order.sort( + key=lambda r: -(notifs_by_room[r][-1]['received_ts'] or 0) + ) + rooms = [] for r in rooms_in_order: @@ -139,12 +149,12 @@ class Mailer(object): ) rooms.append(roomvars) - summary_text = self.make_summary_text( - notifs_by_room, state_by_room, notif_events, user_id + reason['room_name'] = calculate_room_name( + state_by_room[reason['room_id']], user_id, fallback_to_members=True ) - reason['room_name'] = calculate_room_name( - state_by_room[reason['room_id']], user_id, fallback_to_members=False + summary_text = self.make_summary_text( + notifs_by_room, state_by_room, notif_events, user_id, reason ) template_vars = { @@ -251,7 +261,9 @@ class Mailer(object): sender_state_event = room_state[("m.room.member", event.sender)] sender_name = name_from_member_event(sender_state_event) - sender_avatar_url = sender_state_event.content["avatar_url"] + sender_avatar_url = None + if "avatar_url" in sender_state_event.content: + sender_avatar_url = sender_state_event.content["avatar_url"] # 'hash' for deterministically picking default images: use # sender_hash % the number of default images to choose from @@ -296,7 +308,8 @@ class Mailer(object): return messagevars - def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id): + def make_summary_text(self, notifs_by_room, state_by_room, + notif_events, user_id, reason): if len(notifs_by_room) == 1: # Only one room has new stuff room_id = notifs_by_room.keys()[0] @@ -371,9 +384,28 @@ class Mailer(object): } else: # Stuff's happened in multiple different rooms - return MESSAGES_IN_ROOMS % { - "app": self.app_name, - } + + # ...but we still refer to the 'reason' room which triggered the mail + if reason['room_name'] is not None: + return MESSAGES_IN_ROOM_AND_OTHERS % { + "room": reason['room_name'], + "app": self.app_name, + } + else: + # If the reason room doesn't have a name, say who the messages + # are from explicitly to avoid, "messages in the Bob room" + sender_ids = list(set([ + notif_events[n['event_id']].sender + for n in notifs_by_room[reason['room_id']] + ])) + + return MESSAGES_FROM_PERSON_AND_OTHERS % { + "person": descriptor_from_member_events([ + state_by_room[reason['room_id']][("m.room.member", s)] + for s in sender_ids + ]), + "app": self.app_name, + } def make_room_link(self, room_id): # need /beta for Universal Links to work on iOS diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 644aa4e513..db52a1fc39 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -279,8 +279,9 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): - handler = self.handlers.room_list_handler - data = yield handler.get_public_room_list() + handler = self.hs.get_room_list_handler() + data = yield handler.get_aggregated_public_room_list() + defer.returnValue((200, data)) diff --git a/synapse/server.py b/synapse/server.py index 01f828819f..7cf22b1eea 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,6 +22,8 @@ from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.enterprise import adbapi +from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.appservice.api import ApplicationServiceApi from synapse.federation import initialize_http_replication from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.notifier import Notifier @@ -30,6 +32,8 @@ from synapse.handlers import Handlers from synapse.handlers.presence import PresenceHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler +from synapse.handlers.room import RoomListHandler +from synapse.handlers.appservice import ApplicationServicesHandler from synapse.state import StateHandler from synapse.storage import DataStore from synapse.util import Clock @@ -84,6 +88,10 @@ class HomeServer(object): 'presence_handler', 'sync_handler', 'typing_handler', + 'room_list_handler', + 'application_service_api', + 'application_service_scheduler', + 'application_service_handler', 'notifier', 'distributor', 'client_resource', @@ -179,6 +187,18 @@ class HomeServer(object): def build_sync_handler(self): return SyncHandler(self) + def build_room_list_handler(self): + return RoomListHandler(self) + + def build_application_service_api(self): + return ApplicationServiceApi(self) + + def build_application_service_scheduler(self): + return ApplicationServiceScheduler(self) + + def build_application_service_handler(self): + return ApplicationServicesHandler(self) + def build_event_sources(self): return EventSources(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d970fde9e8..8581796b7e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import Cache +from ._base import Cache, LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine self.client_ip_last_seen = Cache( @@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = LoggingTransaction( + db_conn.cursor(), + name="_find_stream_orderings_for_times_txn", + database_engine=self.database_engine, + after_callbacks=[] + ) + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e0d7098692..56a0dd80f3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -153,7 +153,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self._clock = hs.get_clock() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 9705db5c47..940e11d7a2 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,6 +24,10 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None): + max_stream_ordering=None, + limit=20): def get_after_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " @@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore): if max_stream_ordering is not None: sql += " AND ep.stream_ordering <= ?" args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering ASC" + sql += " ORDER BY ep.stream_ordering ASC LIMIT ?" + args.append(limit) txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( @@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) - def _remove_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (room_id, user_id, ) ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. txn.execute( - "DELETE FROM event_push_actions" - " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", - (room_id, user_id, topological_ordering,) + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < ?", + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) + ) + + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago ) + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + if max_stream_ordering is None: + return 0 + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + + return range_end + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d2bf7f2aec..ebb97c8474 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from twisted.internet import defer import logging @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): - @cachedInlineCallbacks() + @cachedInlineCallbacks(lru=True) def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( table="push_rules", @@ -44,7 +44,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rows) - @cachedInlineCallbacks() + @cachedInlineCallbacks(lru=True) def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", @@ -60,12 +60,16 @@ class PushRuleStore(SQLBaseStore): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules(self, user_ids): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: [] + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules", @@ -75,18 +79,24 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules", ) - rows.sort(key=lambda e: (-e["priority_class"], -e["priority"])) + rows.sort( + key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) + ) for row in rows: results.setdefault(row['user_name'], []).append(row) defer.returnValue(results) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_enabled_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules_enabled(self, user_ids): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: {} + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules_enable", @@ -96,7 +106,8 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules_enabled", ) for row in rows: - results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] + enabled = bool(row['enabled']) + results.setdefault(row['user_name'], {})[row['rule_id']] = enabled defer.returnValue(results) @defer.inlineCallbacks diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fdcf28f3e1..f1774f0e44 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore): ) if receipt_type == "m.read" and topological_ordering: - self._remove_push_actions_before_txn( + self._remove_old_push_actions_before_txn( txn, room_id=room_id, user_id=user_id, diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index b417e3ac08..5b7d8d1ab5 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.storage.appservice import ApplicationServiceStore +from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = ApplicationServiceStore.load_appservices( + appservices = load_appservices( config.server_name, config_files ) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index b10f2a5787..ea6823f18d 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -19,17 +19,24 @@ from ._base import SQLBaseStore from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash +from synapse.util.caches.descriptors import cached, cachedList class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" + @cached(lru=True) + def get_event_reference_hash(self, event_id): + return self._get_event_reference_hashes_txn(event_id) + + @cachedList(cached_method_name="get_event_reference_hash", + list_name="event_ids", num_args=1) def get_event_reference_hashes(self, event_ids): def f(txn): - return [ - self._get_event_reference_hashes_txn(txn, ev) - for ev in event_ids - ] + return { + event_id: self._get_event_reference_hashes_txn(txn, event_id) + for event_id in event_ids + } return self.runInteraction( "get_event_reference_hashes", @@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore): hashes = yield self.get_event_reference_hashes( event_ids ) - hashes = [ - { + hashes = { + e_id: { k: encode_base64(v) for k, v in h.items() if k == "sha256" } - for h in hashes - ] + for e_id, h in hashes.items() + } - defer.returnValue(zip(event_ids, hashes)) + defer.returnValue(hashes.items()) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 3efa8a8206..a6866f6117 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -14,6 +14,9 @@ # limitations under the License. import re +import logging + +logger = logging.getLogger(__name__) # intentionally looser than what aliases we allow to be registered since # other HSes may allow aliases that we would not @@ -105,13 +108,21 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True): # or inbound invite, or outbound 3PID invite. if all_members[0].sender == user_id: if "m.room.third_party_invite" in room_state_bytype: - third_party_invites = room_state_bytype["m.room.third_party_invite"] + third_party_invites = ( + room_state_bytype["m.room.third_party_invite"].values() + ) + if len(third_party_invites) > 0: # technically third party invite events are not member # events, but they are close enough - return "Inviting %s" ( - descriptor_from_member_events(third_party_invites) - ) + + # FIXME: no they're not - they look nothing like a member; + # they have a great big encrypted thing as their name to + # prevent leaking the 3PID name... + # return "Inviting %s" % ( + # descriptor_from_member_events(third_party_invites) + # ) + return "Inviting email address" else: return ALL_ALONE else: |