diff options
author | Paul "LeoNerd" Evans <paul@matrix.org> | 2015-12-10 16:21:00 +0000 |
---|---|---|
committer | Paul "LeoNerd" Evans <paul@matrix.org> | 2015-12-10 16:21:00 +0000 |
commit | d7ee7b589f0535c21301f38e93b0cabc0cf288d4 (patch) | |
tree | fcd7d110dc66d5e175f1030d10e0bbd5624bbf3c /synapse/handlers | |
parent | Don't complain if /make_join response lacks 'prev_state' list (SYN-517) (diff) | |
parent | Merge pull request #432 from matrix-org/pushrules_refactor (diff) | |
download | synapse-d7ee7b589f0535c21301f38e93b0cabc0cf288d4.tar.xz |
Merge branch 'develop' into paul/tiny-fixes
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 10 | ||||
-rw-r--r-- | synapse/handlers/account_data.py (renamed from synapse/handlers/private_user_data.py) | 27 | ||||
-rw-r--r-- | synapse/handlers/admin.py | 33 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 37 | ||||
-rw-r--r-- | synapse/handlers/events.py | 20 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 28 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 14 | ||||
-rw-r--r-- | synapse/handlers/message.py | 87 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 20 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 28 | ||||
-rw-r--r-- | synapse/handlers/register.py | 18 | ||||
-rw-r--r-- | synapse/handlers/room.py | 34 | ||||
-rw-r--r-- | synapse/handlers/search.py | 210 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 102 |
14 files changed, 451 insertions, 217 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6519f183df..5fd20285d2 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -92,7 +92,15 @@ class BaseHandler(object): membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - membership = membership_event.membership + was_forgotten_at_event = yield self.store.was_forgotten_at( + membership_event.state_key, + membership_event.room_id, + membership_event.event_id + ) + if was_forgotten_at_event: + membership = None + else: + membership = membership_event.membership else: membership = None diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/account_data.py index 1abe45ed7b..fe773bee9b 100644 --- a/synapse/handlers/private_user_data.py +++ b/synapse/handlers/account_data.py @@ -16,22 +16,23 @@ from twisted.internet import defer -class PrivateUserDataEventSource(object): +class AccountDataEventSource(object): def __init__(self, hs): self.store = hs.get_datastore() def get_current_key(self, direction='f'): - return self.store.get_max_private_user_data_stream_id() + return self.store.get_max_account_data_stream_id() @defer.inlineCallbacks def get_new_events(self, user, from_key, **kwargs): user_id = user.to_string() last_stream_id = from_key - current_stream_id = yield self.store.get_max_private_user_data_stream_id() - tags = yield self.store.get_updated_tags(user_id, last_stream_id) + current_stream_id = yield self.store.get_max_account_data_stream_id() results = [] + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + for room_id, room_tags in tags.items(): results.append({ "type": "m.tag", @@ -39,6 +40,24 @@ class PrivateUserDataEventSource(object): "room_id": room_id, }) + account_data, room_account_data = ( + yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) + ) + + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + }) + + for room_id, account_data in room_account_data.items(): + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + "room_id": room_id, + }) + defer.returnValue((results, current_stream_id)) @defer.inlineCallbacks diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index d852a18555..04fa58df65 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -30,34 +30,27 @@ class AdminHandler(BaseHandler): @defer.inlineCallbacks def get_whois(self, user): - res = yield self.store.get_user_ip_and_agents(user) - - d = {} - for r in res: - # Note that device_id is always None - device = d.setdefault(r["device_id"], {}) - session = device.setdefault(r["access_token"], []) - session.append({ - "ip": r["ip"], - "user_agent": r["user_agent"], - "last_seen": r["last_seen"], + connections = [] + + sessions = yield self.store.get_user_ip_and_agents(user) + for session in sessions: + connections.append({ + "ip": session["ip"], + "last_seen": session["last_seen"], + "user_agent": session["user_agent"], }) ret = { "user_id": user.to_string(), - "devices": [ - { - "device_id": k, + "devices": { + "": { "sessions": [ { - # "access_token": x, TODO (erikj) - "connections": y, + "connections": connections, } - for x, y in v.items() ] - } - for k, v in d.items() - ], + }, + }, } defer.returnValue(ret) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 1b11dbdffd..e64b67cdfd 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -18,7 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.constants import LoginType from synapse.types import UserID -from synapse.api.errors import LoginError, Codes +from synapse.api.errors import AuthError, LoginError, Codes from synapse.util.async import run_on_reactor from twisted.web.client import PartialDownloadError @@ -46,6 +46,7 @@ class AuthHandler(BaseHandler): } self.bcrypt_rounds = hs.config.bcrypt_rounds self.sessions = {} + self.INVALID_TOKEN_HTTP_STATUS = 401 @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): @@ -297,10 +298,11 @@ class AuthHandler(BaseHandler): defer.returnValue((user_id, access_token, refresh_token)) @defer.inlineCallbacks - def login_with_cas_user_id(self, user_id): + def get_login_tuple_for_user_id(self, user_id): """ - Authenticates the user with the given user ID, - intended to have been captured from a CAS response + Gets login tuple for the user with the given user ID. + The user is assumed to have been authenticated by some other + machanism (e.g. CAS) Args: user_id (str): User ID @@ -393,6 +395,23 @@ class AuthHandler(BaseHandler): )) return m.serialize() + def generate_short_term_login_token(self, user_id): + macaroon = self._generate_base_macaroon(user_id) + macaroon.add_first_party_caveat("type = login") + now = self.hs.get_clock().time_msec() + expiry = now + (2 * 60 * 1000) + macaroon.add_first_party_caveat("time < %d" % (expiry,)) + return macaroon.serialize() + + def validate_short_term_login_token_and_get_user_id(self, login_token): + try: + macaroon = pymacaroons.Macaroon.deserialize(login_token) + auth_api = self.hs.get_auth() + auth_api.validate_macaroon(macaroon, "login", True) + return self._get_user_from_macaroon(macaroon) + except (pymacaroons.exceptions.MacaroonException, TypeError, ValueError): + raise AuthError(401, "Invalid token", errcode=Codes.UNKNOWN_TOKEN) + def _generate_base_macaroon(self, user_id): macaroon = pymacaroons.Macaroon( location=self.hs.config.server_name, @@ -402,6 +421,16 @@ class AuthHandler(BaseHandler): macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) return macaroon + def _get_user_from_macaroon(self, macaroon): + user_prefix = "user_id = " + for caveat in macaroon.caveats: + if caveat.caveat_id.startswith(user_prefix): + return caveat.caveat_id[len(user_prefix):] + raise AuthError( + self.INVALID_TOKEN_HTTP_STATUS, "No user_id found in token", + errcode=Codes.UNKNOWN_TOKEN + ) + @defer.inlineCallbacks def set_password(self, user_id, newpassword): password_hash = self.hash(newpassword) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0e4c0d4d06..fe300433e6 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -28,6 +28,18 @@ import random logger = logging.getLogger(__name__) +def started_user_eventstream(distributor, user): + return distributor.fire("started_user_eventstream", user) + + +def stopped_user_eventstream(distributor, user): + return distributor.fire("stopped_user_eventstream", user) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -66,7 +78,7 @@ class EventStreamHandler(BaseHandler): except: logger.exception("Failed to cancel event timer") else: - yield self.distributor.fire("started_user_eventstream", user) + yield started_user_eventstream(self.distributor, user) self._streams_per_user[user] += 1 @@ -89,7 +101,7 @@ class EventStreamHandler(BaseHandler): self._stop_timer_per_user.pop(user, None) - return self.distributor.fire("stopped_user_eventstream", user) + return stopped_user_eventstream(self.distributor, user) logger.debug("Scheduling _later: for %s", user) self._stop_timer_per_user[user] = ( @@ -120,9 +132,7 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) if is_guest: - yield self.distributor.fire( - "user_joined_room", user=auth_user, room_id=room_id - ) + yield user_joined_room(self.distributor, auth_user, room_id) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c1bce07e31..2855f2d7c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,10 @@ import logging logger = logging.getLogger(__name__) +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -60,10 +64,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.distributor.observe( - "user_joined_room", - self._on_user_joined - ) + self.distributor.observe("user_joined_room", self.user_joined_room) self.waiting_for_join_list = {} @@ -176,7 +177,7 @@ class FederationHandler(BaseHandler): ) try: - _, event_stream_id, max_stream_id = yield self._handle_new_event( + context, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -233,10 +234,13 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: - user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally + # joined the room. Don't bother if the user is just + # changing their profile info. + user = UserID.from_string(event.state_key) + yield user_joined_room(self.distributor, user, event.room_id) @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): @@ -733,9 +737,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + yield user_joined_room(self.distributor, user, event.room_id) new_pdu = event @@ -1082,7 +1084,7 @@ class FederationHandler(BaseHandler): return self.store.get_min_depth(context) @log_function - def _on_user_joined(self, user, room_id): + def user_joined_room(self, user, room_id): waiters = self.waiting_for_join_list.get( (user.to_string(), room_id), [] diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 2a99921d5f..f1fa562fff 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -20,7 +20,6 @@ from synapse.api.errors import ( CodeMessageException ) from ._base import BaseHandler -from synapse.http.client import SimpleHttpClient from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError @@ -35,13 +34,12 @@ class IdentityHandler(BaseHandler): def __init__(self, hs): super(IdentityHandler, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + @defer.inlineCallbacks def threepid_from_creds(self, creds): yield run_on_reactor() - # TODO: get this from the homeserver rather than creating a new one for - # each request - http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] trustedIdServers = ['matrix.org', 'vector.im'] @@ -67,7 +65,7 @@ class IdentityHandler(BaseHandler): data = {} try: - data = yield http_client.get_json( + data = yield self.http_client.get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" @@ -85,7 +83,6 @@ class IdentityHandler(BaseHandler): def bind_threepid(self, creds, mxid): yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) - http_client = SimpleHttpClient(self.hs) data = None if 'id_server' in creds: @@ -103,7 +100,7 @@ class IdentityHandler(BaseHandler): raise SynapseError(400, "No client_secret in creds") try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), @@ -121,7 +118,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): yield run_on_reactor() - http_client = SimpleHttpClient(self.hs) params = { 'email': email, @@ -131,7 +127,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 14051aee99..ccdd3d8473 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,11 +26,17 @@ from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler +from canonicaljson import encode_canonical_json + import logging logger = logging.getLogger(__name__) +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -195,10 +201,8 @@ class MessageHandler(BaseHandler): if membership == Membership.JOIN: joinee = UserID.from_string(builder.state_key) # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", - joinee, - builder.content + yield collect_presencelike_data( + self.distributor, joinee, builder.content ) if token_id is not None: @@ -211,6 +215,16 @@ class MessageHandler(BaseHandler): builder=builder, ) + if event.is_state(): + prev_state = context.current_state.get((event.type, event.state_key)) + if prev_state and event.user_id == prev_state.user_id: + prev_content = encode_canonical_json(prev_state.content) + next_content = encode_canonical_json(event.content) + if prev_content == next_content: + # Duplicate suppression for state updates with same sender + # and content. + defer.returnValue(prev_state) + if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler yield member_handler.change_membership(event, context, is_guest=is_guest) @@ -359,6 +373,10 @@ class MessageHandler(BaseHandler): tags_by_room = yield self.store.get_tags_for_user(user_id) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user(user_id) + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -436,14 +454,22 @@ class MessageHandler(BaseHandler): for c in current_state.values() ] - private_user_data = [] + account_data_events = [] tags = tags_by_room.get(event.room_id) if tags: - private_user_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - d["private_user_data"] = private_user_data + + account_data = account_data_by_room.get(event.room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + d["account_data"] = account_data_events except: logger.exception("Failed to get snapshot") @@ -456,9 +482,17 @@ class MessageHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) + account_data_events = [] + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + ret = { "rooms": rooms_ret, "presence": presence, + "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), } @@ -498,14 +532,22 @@ class MessageHandler(BaseHandler): user_id, room_id, pagin_config, membership, member_event_id, is_guest ) - private_user_data = [] + account_data_events = [] tags = yield self.store.get_tags_for_room(user_id, room_id) if tags: - private_user_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - result["private_user_data"] = private_user_data + + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + result["account_data"] = account_data_events defer.returnValue(result) @@ -588,23 +630,28 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = {} - if not is_guest: - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) - receipts_handler = self.hs.get_handlers().receipts_handler + @defer.inlineCallbacks + def get_receipts(): + receipts_handler = self.hs.get_handlers().receipts_handler + receipts = yield receipts_handler.get_receipts_for_room( + room_id, + now_token.receipt_key + ) + defer.returnValue(receipts) presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), - receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), + get_receipts(), self.store.get_recent_events_for_room( room_id, limit=limit, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aca65096fc..63d6f30a7b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -62,6 +62,14 @@ def partitionbool(l, func): return ret.get(True, []), ret.get(False, []) +def user_presence_changed(distributor, user, statuscache): + return distributor.fire("user_presence_changed", user, statuscache) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class PresenceHandler(BaseHandler): STATE_LEVELS = { @@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_state( target_user.localpart, state_to_store ) - yield self.distributor.fire( - "collect_presencelike_data", target_user, state - ) + yield collect_presencelike_data(self.distributor, target_user, state) if now_level > was_level: state["last_active"] = self.clock.time_msec() @@ -467,7 +473,7 @@ class PresenceHandler(BaseHandler): ) @defer.inlineCallbacks - def send_invite(self, observer_user, observed_user): + def send_presence_invite(self, observer_user, observed_user): """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler): room_ids=room_ids, statuscache=statuscache, ) - yield self.distributor.fire("user_presence_changed", user, statuscache) + yield user_presence_changed(self.distributor, user, statuscache) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap[user].get_state()["last_active"] ) - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) + yield collect_presencelike_data(self.distributor, user, state) if "last_active" in state: state = dict(state) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 799faffe53..576c6f09b4 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -28,6 +28,14 @@ import logging logger = logging.getLogger(__name__) +def changed_presencelike_data(distributor, user, state): + return distributor.fire("changed_presencelike_data", user, state) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class ProfileHandler(BaseHandler): def __init__(self, hs): @@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "displayname": new_displayname, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "displayname": new_displayname, + }) yield self._update_join_states(target_user) @@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "avatar_url": new_avatar_url, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "avatar_url": new_avatar_url, + }) yield self._update_join_states(target_user) @@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler): "membership": Membership.JOIN, } - yield self.distributor.fire( - "collect_presencelike_data", user, content - ) + yield collect_presencelike_data(self.distributor, user, content) msg_handler = self.hs.get_handlers().message_handler try: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 493a087031..a037da0f70 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -31,6 +31,10 @@ import urllib logger = logging.getLogger(__name__) +def registered_user(distributor, user): + return distributor.fire("registered_user", user) + + class RegistrationHandler(BaseHandler): def __init__(self, hs): @@ -38,6 +42,7 @@ class RegistrationHandler(BaseHandler): self.distributor = hs.get_distributor() self.distributor.declare("registered_user") + self.captch_client = CaptchaServerHttpClient(hs) @defer.inlineCallbacks def check_username(self, localpart): @@ -98,7 +103,7 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) else: # autogen a random user ID attempts = 0 @@ -117,7 +122,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash) - self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except SynapseError: # if user id is taken, just generate another user_id = None @@ -167,7 +172,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - self.distributor.fire("registered_user", user) + registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -215,7 +220,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=None ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except Exception, e: yield self.store.add_access_token_to_user(user_id, token) # Ignore Registration errors @@ -302,10 +307,7 @@ class RegistrationHandler(BaseHandler): """ Used only by c/s api v1 """ - # TODO: get this from the homeserver rather than creating a new one for - # each request - client = CaptchaServerHttpClient(self.hs) - data = yield client.post_urlencoded_get_raw( + data = yield self.captcha_client.post_urlencoded_get_raw( "http://www.google.com:80/recaptcha/api/verify", args={ 'privatekey': private_key, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3f04752581..116a998c42 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -41,6 +41,18 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + +def user_left_room(distributor, user, room_id): + return distributor.fire("user_left_room", user=user, room_id=room_id) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user=user, room_id=room_id) + + class RoomCreationHandler(BaseHandler): PRESETS_DICT = { @@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.JOIN: user = UserID.from_string(event.user_id) - self.distributor.fire( - "user_left_room", user=user, room_id=event.room_id - ) + user_left_room(self.distributor, user, event.room_id) defer.returnValue({"room_id": room_id}) @@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler): raise SynapseError(404, "No known servers") # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", joinee, content - ) + yield collect_presencelike_data(self.distributor, joinee, content) content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ @@ -517,10 +525,13 @@ class RoomMemberHandler(BaseHandler): do_auth=do_auth, ) - user = UserID.from_string(event.user_id) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + user = UserID.from_string(event.user_id) + yield user_joined_room(self.distributor, user, room_id) @defer.inlineCallbacks def get_inviter(self, event): @@ -743,6 +754,9 @@ class RoomMemberHandler(BaseHandler): ) defer.returnValue((token, public_key, key_validity_url, display_name)) + def forget(self, user, room_id): + self.store.forget(user.to_string(), room_id) + class RoomListHandler(BaseHandler): diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index b7545c111f..bc79564287 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -17,13 +17,14 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.api.constants import Membership +from synapse.api.constants import Membership, EventTypes from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event from unpaddedbase64 import decode_base64, encode_base64 +import itertools import logging @@ -79,6 +80,9 @@ class SearchHandler(BaseHandler): # What to order results by (impacts whether pagination can be doen) order_by = room_cat.get("order_by", "rank") + # Return the current state of the rooms? + include_state = room_cat.get("include_state", False) + # Include context around each event? event_context = room_cat.get( "event_context", None @@ -96,6 +100,10 @@ class SearchHandler(BaseHandler): after_limit = int(event_context.get( "after_limit", 5 )) + + # Return the historic display name and avatar for the senders + # of the events? + include_profile = bool(event_context.get("include_profile", False)) except KeyError: raise SynapseError(400, "Invalid search query") @@ -123,6 +131,17 @@ class SearchHandler(BaseHandler): if batch_group == "room_id": room_ids.intersection_update({batch_group_key}) + if not room_ids: + defer.returnValue({ + "search_categories": { + "room_events": { + "results": [], + "count": 0, + "highlights": [], + } + } + }) + rank_map = {} # event_id -> rank of event allowed_events = [] room_groups = {} # Holds result of grouping by room, if applicable @@ -131,11 +150,18 @@ class SearchHandler(BaseHandler): # Holds the next_batch for the entire result set if one of those exists global_next_batch = None + highlights = set() + if order_by == "rank": - results = yield self.store.search_msgs( + search_result = yield self.store.search_msgs( room_ids, search_term, keys ) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + results_map = {r["event"].event_id: r for r in results} rank_map.update({r["event"].event_id: r["rank"] for r in results}) @@ -163,80 +189,76 @@ class SearchHandler(BaseHandler): s["results"].append(e.event_id) elif order_by == "recent": - # In this case we specifically loop through each room as the given - # limit applies to each room, rather than a global list. - # This is not necessarilly a good idea. - for room_id in room_ids: - room_events = [] - if batch_group == "room_id" and batch_group_key == room_id: - pagination_token = batch_token - else: - pagination_token = None - i = 0 - - # We keep looping and we keep filtering until we reach the limit - # or we run out of things. - # But only go around 5 times since otherwise synapse will be sad. - while len(room_events) < search_filter.limit() and i < 5: - i += 1 - results = yield self.store.search_room( - room_id, search_term, keys, search_filter.limit() * 2, - pagination_token=pagination_token, - ) + room_events = [] + i = 0 + + pagination_token = batch_token + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit() and i < 5: + i += 1 + search_result = yield self.store.search_rooms( + room_ids, search_term, keys, search_filter.limit() * 2, + pagination_token=pagination_token, + ) - results_map = {r["event"].event_id: r for r in results} + if search_result["highlights"]: + highlights.update(search_result["highlights"]) - rank_map.update({r["event"].event_id: r["rank"] for r in results}) + results = search_result["results"] - filtered_events = search_filter.filter([ - r["event"] for r in results - ]) + results_map = {r["event"].event_id: r for r in results} - events = yield self._filter_events_for_client( - user.to_string(), filtered_events - ) + rank_map.update({r["event"].event_id: r["rank"] for r in results}) - room_events.extend(events) - room_events = room_events[:search_filter.limit()] + filtered_events = search_filter.filter([ + r["event"] for r in results + ]) - if len(results) < search_filter.limit() * 2: - pagination_token = None - break - else: - pagination_token = results[-1]["pagination_token"] - - if room_events: - res = results_map[room_events[-1].event_id] - pagination_token = res["pagination_token"] - - group = room_groups.setdefault(room_id, {}) - if pagination_token: - next_batch = encode_base64("%s\n%s\n%s" % ( - "room_id", room_id, pagination_token - )) - group["next_batch"] = next_batch - - if batch_token: - global_next_batch = next_batch - - group["results"] = [e.event_id for e in room_events] - group["order"] = max( - e.origin_server_ts/1000 for e in room_events - if hasattr(e, "origin_server_ts") - ) + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) - allowed_events.extend(room_events) + room_events.extend(events) + room_events = room_events[:search_filter.limit()] - # Normalize the group orders - if room_groups: - if len(room_groups) > 1: - mx = max(g["order"] for g in room_groups.values()) - mn = min(g["order"] for g in room_groups.values()) + if len(results) < search_filter.limit() * 2: + pagination_token = None + break + else: + pagination_token = results[-1]["pagination_token"] - for g in room_groups.values(): - g["order"] = (g["order"] - mn) * 1.0 / (mx - mn) + for event in room_events: + group = room_groups.setdefault(event.room_id, { + "results": [], + }) + group["results"].append(event.event_id) + + if room_events and len(room_events) >= search_filter.limit(): + last_event_id = room_events[-1].event_id + pagination_token = results_map[last_event_id]["pagination_token"] + + # We want to respect the given batch group and group keys so + # that if people blindly use the top level `next_batch` token + # it returns more from the same group (if applicable) rather + # than reverting to searching all results again. + if batch_group and batch_group_key: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + batch_group, batch_group_key, pagination_token + )) else: - room_groups.values()[0]["order"] = 1 + global_next_batch = encode_base64("%s\n%s\n%s" % ( + "all", "", pagination_token + )) + + for room_id, group in room_groups.items(): + group["next_batch"] = encode_base64("%s\n%s\n%s" % ( + "room_id", room_id, pagination_token + )) + + allowed_events.extend(room_events) else: # We should never get here due to the guard earlier. @@ -269,6 +291,33 @@ class SearchHandler(BaseHandler): "room_key", res["end"] ).to_string() + if include_profile: + senders = set( + ev.sender + for ev in itertools.chain( + res["events_before"], [event], res["events_after"] + ) + ) + + if res["events_after"]: + last_event_id = res["events_after"][-1].event_id + else: + last_event_id = event.event_id + + state = yield self.store.get_state_for_event( + last_event_id, + types=[(EventTypes.Member, sender) for sender in senders] + ) + + res["profile_info"] = { + s.state_key: { + "displayname": s.content.get("displayname", None), + "avatar_url": s.content.get("avatar_url", None), + } + for s in state.values() + if s.type == EventTypes.Member and s.state_key in senders + } + contexts[event.event_id] = res else: contexts = {} @@ -287,22 +336,39 @@ class SearchHandler(BaseHandler): for e in context["events_after"] ] - results = { - e.event_id: { + state_results = {} + if include_state: + rooms = set(e.room_id for e in allowed_events) + for room_id in rooms: + state = yield self.state_handler.get_current_state(room_id) + state_results[room_id] = state.values() + + state_results.values() + + # We're now about to serialize the events. We should not make any + # blocking calls after this. Otherwise the 'age' will be wrong + + results = [ + { "rank": rank_map[e.event_id], "result": serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } for e in allowed_events - } - - logger.info("Found %d results", len(results)) + ] rooms_cat_res = { "results": results, - "count": len(results) + "count": len(results), + "highlights": list(highlights), } + if state_results: + rooms_cat_res["state"] = { + room_id: [serialize_event(e, time_now) for e in state] + for room_id, state in state_results.items() + } + if room_groups and "room_id" in group_keys: rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6dc9d0fb92..24c2b2fad6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -51,7 +51,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "timeline", # TimelineBatch "state", # dict[(str, str), FrozenEvent] "ephemeral", - "private_user_data", + "account_data", ])): __slots__ = [] @@ -63,7 +63,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ self.timeline or self.state or self.ephemeral - or self.private_user_data + or self.account_data ) @@ -71,7 +71,7 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", # str "timeline", # TimelineBatch "state", # dict[(str, str), FrozenEvent] - "private_user_data", + "account_data", ])): __slots__ = [] @@ -82,7 +82,7 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ return bool( self.timeline or self.state - or self.private_user_data + or self.account_data ) @@ -100,6 +100,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. + "account_data", # List of account_data events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. @@ -185,13 +186,19 @@ class SyncHandler(BaseHandler): pagination_config=pagination_config.get_source_config("presence"), key=None ) + + membership_list = (Membership.INVITE, Membership.JOIN) + if sync_config.filter.include_leave: + membership_list += (Membership.LEAVE, Membership.BAN) + room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=( - Membership.INVITE, - Membership.JOIN, - Membership.LEAVE, - Membership.BAN + membership_list=membership_list + ) + + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() ) ) @@ -211,6 +218,7 @@ class SyncHandler(BaseHandler): timeline_since_token=timeline_since_token, ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -230,11 +238,13 @@ class SyncHandler(BaseHandler): leave_token=leave_token, timeline_since_token=timeline_since_token, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -244,7 +254,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -261,20 +272,39 @@ class SyncHandler(BaseHandler): timeline=batch, state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), - private_user_data=self.private_user_data_for_room( - room_id, tags_by_room + account_data=self.account_data_for_room( + room_id, tags_by_room, account_data_by_room ), )) - def private_user_data_for_room(self, room_id, tags_by_room): - private_user_data = [] + def account_data_for_user(self, account_data): + account_data_events = [] + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events + + def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): + account_data_events = [] tags = tags_by_room.get(room_id) if tags is not None: - private_user_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - return private_user_data + + account_data = account_data_by_room.get(room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): @@ -341,7 +371,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token, tags_by_room): + timeline_since_token, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -357,8 +388,8 @@ class SyncHandler(BaseHandler): room_id=room_id, timeline=batch, state=leave_state, - private_user_data=self.private_user_data_for_room( - room_id, tags_by_room + account_data=self.account_data_for_room( + room_id, tags_by_room, account_data_by_room ), )) @@ -412,7 +443,14 @@ class SyncHandler(BaseHandler): tags_by_room = yield self.store.get_updated_tags( sync_config.user.to_string(), - since_token.private_user_data_key, + since_token.account_data_key, + ) + + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + sync_config.user.to_string(), + since_token.account_data_key, + ) ) joined = [] @@ -468,8 +506,8 @@ class SyncHandler(BaseHandler): ), state=state, ephemeral=ephemeral_by_room.get(room_id, []), - private_user_data=self.private_user_data_for_room( - room_id, tags_by_room + account_data=self.account_data_for_room( + room_id, tags_by_room, account_data_by_room ), ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -492,14 +530,15 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room + ephemeral_by_room, tags_by_room, account_data_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room + sync_config, leave_event, since_token, tags_by_room, + account_data_by_room ) archived.append(room_sync) @@ -510,6 +549,7 @@ class SyncHandler(BaseHandler): defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -566,7 +606,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -605,8 +646,8 @@ class SyncHandler(BaseHandler): timeline=batch, state=state, ephemeral=ephemeral_by_room.get(room_id, []), - private_user_data=self.private_user_data_for_room( - room_id, tags_by_room + account_data=self.account_data_for_room( + room_id, tags_by_room, account_data_by_room ), ) @@ -616,7 +657,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token, tags_by_room): + since_token, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -653,8 +695,8 @@ class SyncHandler(BaseHandler): room_id=leave_event.room_id, timeline=batch, state=state_events_delta, - private_user_data=self.private_user_data_for_room( - leave_event.room_id, tags_by_room + account_data=self.account_data_for_room( + leave_event.room_id, tags_by_room, account_data_by_room ), ) |