diff options
author | Mark Haines <mjark@negativecurvature.net> | 2016-01-04 14:02:50 +0000 |
---|---|---|
committer | Mark Haines <mjark@negativecurvature.net> | 2016-01-04 14:02:50 +0000 |
commit | f35f8d06ea58e2d0cdccd82924c7a44fd93f4c38 (patch) | |
tree | dc5312558565f8ac01264be21d388e563a5c8c58 /synapse/handlers | |
parent | Added info abou Martin Giess' auto-deployment process with vagrant/ansible (diff) | |
parent | Bump changelog and version for v0.12.0 (diff) | |
download | synapse-f35f8d06ea58e2d0cdccd82924c7a44fd93f4c38.tar.xz |
Merge remote-tracking branch 'origin/release-v0.12.0' v0.12.0
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 10 | ||||
-rw-r--r-- | synapse/handlers/account_data.py | 21 | ||||
-rw-r--r-- | synapse/handlers/admin.py | 33 | ||||
-rw-r--r-- | synapse/handlers/events.py | 31 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 83 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 14 | ||||
-rw-r--r-- | synapse/handlers/message.py | 112 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 20 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 28 | ||||
-rw-r--r-- | synapse/handlers/register.py | 40 | ||||
-rw-r--r-- | synapse/handlers/room.py | 124 | ||||
-rw-r--r-- | synapse/handlers/search.py | 161 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 224 |
13 files changed, 625 insertions, 276 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6519f183df..5fd20285d2 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -92,7 +92,15 @@ class BaseHandler(object): membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - membership = membership_event.membership + was_forgotten_at_event = yield self.store.was_forgotten_at( + membership_event.state_key, + membership_event.room_id, + membership_event.event_id + ) + if was_forgotten_at_event: + membership = None + else: + membership = membership_event.membership else: membership = None diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 1d35d3b7dc..fe773bee9b 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -29,9 +29,10 @@ class AccountDataEventSource(object): last_stream_id = from_key current_stream_id = yield self.store.get_max_account_data_stream_id() - tags = yield self.store.get_updated_tags(user_id, last_stream_id) results = [] + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + for room_id, room_tags in tags.items(): results.append({ "type": "m.tag", @@ -39,6 +40,24 @@ class AccountDataEventSource(object): "room_id": room_id, }) + account_data, room_account_data = ( + yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) + ) + + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + }) + + for room_id, account_data in room_account_data.items(): + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + "room_id": room_id, + }) + defer.returnValue((results, current_stream_id)) @defer.inlineCallbacks diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index d852a18555..04fa58df65 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -30,34 +30,27 @@ class AdminHandler(BaseHandler): @defer.inlineCallbacks def get_whois(self, user): - res = yield self.store.get_user_ip_and_agents(user) - - d = {} - for r in res: - # Note that device_id is always None - device = d.setdefault(r["device_id"], {}) - session = device.setdefault(r["access_token"], []) - session.append({ - "ip": r["ip"], - "user_agent": r["user_agent"], - "last_seen": r["last_seen"], + connections = [] + + sessions = yield self.store.get_user_ip_and_agents(user) + for session in sessions: + connections.append({ + "ip": session["ip"], + "last_seen": session["last_seen"], + "user_agent": session["user_agent"], }) ret = { "user_id": user.to_string(), - "devices": [ - { - "device_id": k, + "devices": { + "": { "sessions": [ { - # "access_token": x, TODO (erikj) - "connections": y, + "connections": connections, } - for x, y in v.items() ] - } - for k, v in d.items() - ], + }, + }, } defer.returnValue(ret) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0e4c0d4d06..576d77e0e7 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -28,6 +28,18 @@ import random logger = logging.getLogger(__name__) +def started_user_eventstream(distributor, user): + return distributor.fire("started_user_eventstream", user) + + +def stopped_user_eventstream(distributor, user): + return distributor.fire("stopped_user_eventstream", user) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -57,7 +69,12 @@ class EventStreamHandler(BaseHandler): A deferred that completes once their presence has been updated. """ if user not in self._streams_per_user: - self._streams_per_user[user] = 0 + # Make sure we set the streams per user to 1 here rather than + # setting it to zero and incrementing the value below. + # Otherwise this may race with stopped_stream causing the + # user to be erased from the map before we have a chance + # to increment it. + self._streams_per_user[user] = 1 if user in self._stop_timer_per_user: try: self.clock.cancel_call_later( @@ -66,9 +83,9 @@ class EventStreamHandler(BaseHandler): except: logger.exception("Failed to cancel event timer") else: - yield self.distributor.fire("started_user_eventstream", user) - - self._streams_per_user[user] += 1 + yield started_user_eventstream(self.distributor, user) + else: + self._streams_per_user[user] += 1 def stopped_stream(self, user): """If there are no streams for a user this starts a timer that will @@ -89,7 +106,7 @@ class EventStreamHandler(BaseHandler): self._stop_timer_per_user.pop(user, None) - return self.distributor.fire("stopped_user_eventstream", user) + return stopped_user_eventstream(self.distributor, user) logger.debug("Scheduling _later: for %s", user) self._stop_timer_per_user[user] = ( @@ -120,9 +137,7 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) if is_guest: - yield self.distributor.fire( - "user_joined_room", user=auth_user, room_id=room_id - ) + yield user_joined_room(self.distributor, auth_user, room_id) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c1bce07e31..28f2ff68d6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,10 @@ import logging logger = logging.getLogger(__name__) +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -60,10 +64,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.distributor.observe( - "user_joined_room", - self._on_user_joined - ) + self.distributor.observe("user_joined_room", self.user_joined_room) self.waiting_for_join_list = {} @@ -176,7 +177,7 @@ class FederationHandler(BaseHandler): ) try: - _, event_stream_id, max_stream_id = yield self._handle_new_event( + context, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -233,10 +234,13 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: - user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally + # joined the room. Don't bother if the user is just + # changing their profile info. + user = UserID.from_string(event.state_key) + yield user_joined_room(self.distributor, user, event.room_id) @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): @@ -592,7 +596,7 @@ class FederationHandler(BaseHandler): handled_events = set() try: - new_event = self._sign_event(event) + event = self._sign_event(event) # Try the host we successfully got a response to /make_join/ # request first. try: @@ -600,7 +604,7 @@ class FederationHandler(BaseHandler): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.replication_layer.send_join(target_hosts, new_event) + ret = yield self.replication_layer.send_join(target_hosts, event) origin = ret["origin"] state = ret["state"] @@ -609,12 +613,12 @@ class FederationHandler(BaseHandler): handled_events.update([s.event_id for s in state]) handled_events.update([a.event_id for a in auth_chain]) - handled_events.add(new_event.event_id) + handled_events.add(event.event_id) logger.debug("do_invite_join auth_chain: %s", auth_chain) logger.debug("do_invite_join state: %s", state) - logger.debug("do_invite_join event: %s", new_event) + logger.debug("do_invite_join event: %s", event) try: yield self.store.store_room( @@ -632,14 +636,14 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - new_event, event_stream_id, max_stream_id, + event, event_stream_id, max_stream_id, extra_users=[joinee] ) def log_failure(f): logger.warn( "Failed to notify about %s: %s", - new_event.event_id, f.value + event.event_id, f.value ) d.addErrback(log_failure) @@ -733,9 +737,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + yield user_joined_room(self.distributor, user, event.room_id) new_pdu = event @@ -1082,7 +1084,7 @@ class FederationHandler(BaseHandler): return self.store.get_min_depth(context) @log_function - def _on_user_joined(self, user, room_id): + def user_joined_room(self, user, room_id): waiters = self.waiting_for_join_list.get( (user.to_string(), room_id), [] @@ -1648,11 +1650,22 @@ class FederationHandler(BaseHandler): sender = invite["sender"] room_id = invite["room_id"] + if "signed" not in invite or "token" not in invite["signed"]: + logger.info( + "Discarding received notification of third party invite " + "without signed: %s" % (invite,) + ) + return + + third_party_invite = { + "signed": invite["signed"], + } + event_dict = { "type": EventTypes.Member, "content": { "membership": Membership.INVITE, - "third_party_invite": invite, + "third_party_invite": third_party_invite, }, "room_id": room_id, "sender": sender, @@ -1663,6 +1676,11 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) event, context = yield self._create_new_client_event(builder=builder) + + event, context = yield self.add_display_name_to_third_party_invite( + event_dict, event, context + ) + self.auth.check(event, context.current_state) yield self._validate_keyserver(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler @@ -1684,6 +1702,10 @@ class FederationHandler(BaseHandler): builder=builder, ) + event, context = yield self.add_display_name_to_third_party_invite( + event_dict, event, context + ) + self.auth.check(event, auth_events=context.current_state) yield self._validate_keyserver(event, auth_events=context.current_state) @@ -1694,6 +1716,27 @@ class FederationHandler(BaseHandler): yield member_handler.change_membership(event, context) @defer.inlineCallbacks + def add_display_name_to_third_party_invite(self, event_dict, event, context): + key = ( + EventTypes.ThirdPartyInvite, + event.content["third_party_invite"]["signed"]["token"] + ) + original_invite = context.current_state.get(key) + if not original_invite: + logger.info( + "Could not find invite event for third_party_invite - " + "discarding: %s" % (event_dict,) + ) + return + + display_name = original_invite.content["display_name"] + event_dict["content"]["third_party_invite"]["display_name"] = display_name + builder = self.event_builder_factory.new(event_dict) + EventValidator().validate_new(builder) + event, context = yield self._create_new_client_event(builder=builder) + defer.returnValue((event, context)) + + @defer.inlineCallbacks def _validate_keyserver(self, event, auth_events): token = event.content["third_party_invite"]["signed"]["token"] diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 2a99921d5f..f1fa562fff 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -20,7 +20,6 @@ from synapse.api.errors import ( CodeMessageException ) from ._base import BaseHandler -from synapse.http.client import SimpleHttpClient from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError @@ -35,13 +34,12 @@ class IdentityHandler(BaseHandler): def __init__(self, hs): super(IdentityHandler, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + @defer.inlineCallbacks def threepid_from_creds(self, creds): yield run_on_reactor() - # TODO: get this from the homeserver rather than creating a new one for - # each request - http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] trustedIdServers = ['matrix.org', 'vector.im'] @@ -67,7 +65,7 @@ class IdentityHandler(BaseHandler): data = {} try: - data = yield http_client.get_json( + data = yield self.http_client.get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" @@ -85,7 +83,6 @@ class IdentityHandler(BaseHandler): def bind_threepid(self, creds, mxid): yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) - http_client = SimpleHttpClient(self.hs) data = None if 'id_server' in creds: @@ -103,7 +100,7 @@ class IdentityHandler(BaseHandler): raise SynapseError(400, "No client_secret in creds") try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), @@ -121,7 +118,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): yield run_on_reactor() - http_client = SimpleHttpClient(self.hs) params = { 'email': email, @@ -131,7 +127,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2e7d0d7f82..a1bed9b0dc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,15 +22,22 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler +from canonicaljson import encode_canonical_json + import logging logger = logging.getLogger(__name__) +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -39,6 +46,7 @@ class MessageHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() + self.snapshot_cache = SnapshotCache() @defer.inlineCallbacks def get_message(self, msg_id=None, room_id=None, sender_id=None, @@ -195,10 +203,8 @@ class MessageHandler(BaseHandler): if membership == Membership.JOIN: joinee = UserID.from_string(builder.state_key) # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", - joinee, - builder.content + yield collect_presencelike_data( + self.distributor, joinee, builder.content ) if token_id is not None: @@ -211,6 +217,16 @@ class MessageHandler(BaseHandler): builder=builder, ) + if event.is_state(): + prev_state = context.current_state.get((event.type, event.state_key)) + if prev_state and event.user_id == prev_state.user_id: + prev_content = encode_canonical_json(prev_state.content) + next_content = encode_canonical_json(event.content) + if prev_content == next_content: + # Duplicate suppression for state updates with same sender + # and content. + defer.returnValue(prev_state) + if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler yield member_handler.change_membership(event, context, is_guest=is_guest) @@ -312,7 +328,6 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) - @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): """Retrieve a snapshot of all rooms the user is invited or has joined. @@ -332,6 +347,28 @@ class MessageHandler(BaseHandler): is joined on, may return a "messages" key with messages, depending on the specified PaginationConfig. """ + key = ( + user_id, + pagin_config.from_token, + pagin_config.to_token, + pagin_config.direction, + pagin_config.limit, + as_client_event, + include_archived, + ) + now_ms = self.clock.time_msec() + result = self.snapshot_cache.get(now_ms, key) + if result is not None: + return result + + return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( + user_id, pagin_config, as_client_event, include_archived + )) + + @defer.inlineCallbacks + def _snapshot_all_rooms(self, user_id=None, pagin_config=None, + as_client_event=True, include_archived=False): + memberships = [Membership.INVITE, Membership.JOIN] if include_archived: memberships.append(Membership.LEAVE) @@ -359,6 +396,10 @@ class MessageHandler(BaseHandler): tags_by_room = yield self.store.get_tags_for_user(user_id) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user(user_id) + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -436,14 +477,22 @@ class MessageHandler(BaseHandler): for c in current_state.values() ] - account_data = [] + account_data_events = [] tags = tags_by_room.get(event.room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - d["account_data"] = account_data + + account_data = account_data_by_room.get(event.room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + d["account_data"] = account_data_events except: logger.exception("Failed to get snapshot") @@ -456,9 +505,17 @@ class MessageHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) + account_data_events = [] + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + ret = { "rooms": rooms_ret, "presence": presence, + "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), } @@ -498,14 +555,22 @@ class MessageHandler(BaseHandler): user_id, room_id, pagin_config, membership, member_event_id, is_guest ) - account_data = [] + account_data_events = [] tags = yield self.store.get_tags_for_room(user_id, room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - result["account_data"] = account_data + + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + result["account_data"] = account_data_events defer.returnValue(result) @@ -588,23 +653,28 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = {} - if not is_guest: - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) - receipts_handler = self.hs.get_handlers().receipts_handler + @defer.inlineCallbacks + def get_receipts(): + receipts_handler = self.hs.get_handlers().receipts_handler + receipts = yield receipts_handler.get_receipts_for_room( + room_id, + now_token.receipt_key + ) + defer.returnValue(receipts) presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), - receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), + get_receipts(), self.store.get_recent_events_for_room( room_id, limit=limit, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aca65096fc..63d6f30a7b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -62,6 +62,14 @@ def partitionbool(l, func): return ret.get(True, []), ret.get(False, []) +def user_presence_changed(distributor, user, statuscache): + return distributor.fire("user_presence_changed", user, statuscache) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class PresenceHandler(BaseHandler): STATE_LEVELS = { @@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_state( target_user.localpart, state_to_store ) - yield self.distributor.fire( - "collect_presencelike_data", target_user, state - ) + yield collect_presencelike_data(self.distributor, target_user, state) if now_level > was_level: state["last_active"] = self.clock.time_msec() @@ -467,7 +473,7 @@ class PresenceHandler(BaseHandler): ) @defer.inlineCallbacks - def send_invite(self, observer_user, observed_user): + def send_presence_invite(self, observer_user, observed_user): """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler): room_ids=room_ids, statuscache=statuscache, ) - yield self.distributor.fire("user_presence_changed", user, statuscache) + yield user_presence_changed(self.distributor, user, statuscache) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap[user].get_state()["last_active"] ) - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) + yield collect_presencelike_data(self.distributor, user, state) if "last_active" in state: state = dict(state) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 799faffe53..576c6f09b4 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -28,6 +28,14 @@ import logging logger = logging.getLogger(__name__) +def changed_presencelike_data(distributor, user, state): + return distributor.fire("changed_presencelike_data", user, state) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class ProfileHandler(BaseHandler): def __init__(self, hs): @@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "displayname": new_displayname, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "displayname": new_displayname, + }) yield self._update_join_states(target_user) @@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "avatar_url": new_avatar_url, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "avatar_url": new_avatar_url, + }) yield self._update_join_states(target_user) @@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler): "membership": Membership.JOIN, } - yield self.distributor.fire( - "collect_presencelike_data", user, content - ) + yield collect_presencelike_data(self.distributor, user, content) msg_handler = self.hs.get_handlers().message_handler try: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 493a087031..baf7c14e40 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -31,6 +31,10 @@ import urllib logger = logging.getLogger(__name__) +def registered_user(distributor, user): + return distributor.fire("registered_user", user) + + class RegistrationHandler(BaseHandler): def __init__(self, hs): @@ -38,6 +42,7 @@ class RegistrationHandler(BaseHandler): self.distributor = hs.get_distributor() self.distributor.declare("registered_user") + self.captcha_client = CaptchaServerHttpClient(hs) @defer.inlineCallbacks def check_username(self, localpart): @@ -98,7 +103,7 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) else: # autogen a random user ID attempts = 0 @@ -117,7 +122,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash) - self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except SynapseError: # if user id is taken, just generate another user_id = None @@ -127,25 +132,9 @@ class RegistrationHandler(BaseHandler): raise RegistrationError( 500, "Cannot generate user ID.") - # create a default avatar for the user - # XXX: ideally clients would explicitly specify one, but given they don't - # and we want consistent and pretty identicons for random users, we'll - # do it here. - try: - auth_user = UserID.from_string(user_id) - media_repository = self.hs.get_resource_for_media_repository() - identicon_resource = media_repository.getChildWithDefault("identicon", None) - upload_resource = media_repository.getChildWithDefault("upload", None) - identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320) - content_uri = yield upload_resource.create_content( - "image/png", None, identicon_bytes, len(identicon_bytes), auth_user - ) - profile_handler = self.hs.get_handlers().profile_handler - profile_handler.set_avatar_url( - auth_user, auth_user, ("%s#auto" % (content_uri,)) - ) - except NotImplementedError: - pass # make tests pass without messing around creating default avatars + # We used to generate default identicons here, but nowadays + # we want clients to generate their own as part of their branding + # rather than there being consistent matrix-wide ones, so we don't. defer.returnValue((user_id, token)) @@ -167,7 +156,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - self.distributor.fire("registered_user", user) + registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -215,7 +204,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=None ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except Exception, e: yield self.store.add_access_token_to_user(user_id, token) # Ignore Registration errors @@ -302,10 +291,7 @@ class RegistrationHandler(BaseHandler): """ Used only by c/s api v1 """ - # TODO: get this from the homeserver rather than creating a new one for - # each request - client = CaptchaServerHttpClient(self.hs) - data = yield client.post_urlencoded_get_raw( + data = yield self.captcha_client.post_urlencoded_get_raw( "http://www.google.com:80/recaptcha/api/verify", args={ 'privatekey': private_key, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3f04752581..13f66e0df0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -41,6 +41,18 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + +def user_left_room(distributor, user, room_id): + return distributor.fire("user_left_room", user=user, room_id=room_id) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user=user, room_id=room_id) + + class RoomCreationHandler(BaseHandler): PRESETS_DICT = { @@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.JOIN: user = UserID.from_string(event.user_id) - self.distributor.fire( - "user_left_room", user=user, room_id=event.room_id - ) + user_left_room(self.distributor, user, event.room_id) defer.returnValue({"room_id": room_id}) @@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler): raise SynapseError(404, "No known servers") # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", joinee, content - ) + yield collect_presencelike_data(self.distributor, joinee, content) content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ @@ -517,10 +525,13 @@ class RoomMemberHandler(BaseHandler): do_auth=do_auth, ) - user = UserID.from_string(event.user_id) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + user = UserID.from_string(event.user_id) + yield user_joined_room(self.distributor, user, room_id) @defer.inlineCallbacks def get_inviter(self, event): @@ -693,13 +704,48 @@ class RoomMemberHandler(BaseHandler): token_id, txn_id ): + room_state = yield self.hs.get_state_handler().get_current_state(room_id) + + inviter_display_name = "" + inviter_avatar_url = "" + member_event = room_state.get((EventTypes.Member, user.to_string())) + if member_event: + inviter_display_name = member_event.content.get("displayname", "") + inviter_avatar_url = member_event.content.get("avatar_url", "") + + canonical_room_alias = "" + canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, "")) + if canonical_alias_event: + canonical_room_alias = canonical_alias_event.content.get("alias", "") + + room_name = "" + room_name_event = room_state.get((EventTypes.Name, "")) + if room_name_event: + room_name = room_name_event.content.get("name", "") + + room_join_rules = "" + join_rules_event = room_state.get((EventTypes.JoinRules, "")) + if join_rules_event: + room_join_rules = join_rules_event.content.get("join_rule", "") + + room_avatar_url = "" + room_avatar_event = room_state.get((EventTypes.RoomAvatar, "")) + if room_avatar_event: + room_avatar_url = room_avatar_event.content.get("url", "") + token, public_key, key_validity_url, display_name = ( yield self._ask_id_server_for_third_party_invite( - id_server, - medium, - address, - room_id, - user.to_string() + id_server=id_server, + medium=medium, + address=address, + room_id=room_id, + inviter_user_id=user.to_string(), + room_alias=canonical_room_alias, + room_avatar_url=room_avatar_url, + room_join_rules=room_join_rules, + room_name=room_name, + inviter_display_name=inviter_display_name, + inviter_avatar_url=inviter_avatar_url ) ) msg_handler = self.hs.get_handlers().message_handler @@ -721,7 +767,19 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( - self, id_server, medium, address, room_id, sender): + self, + id_server, + medium, + address, + room_id, + inviter_user_id, + room_alias, + room_avatar_url, + room_join_rules, + room_name, + inviter_display_name, + inviter_avatar_url + ): is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( id_server_scheme, id_server, ) @@ -731,7 +789,13 @@ class RoomMemberHandler(BaseHandler): "medium": medium, "address": address, "room_id": room_id, - "sender": sender, + "room_alias": room_alias, + "room_avatar_url": room_avatar_url, + "room_join_rules": room_join_rules, + "room_name": room_name, + "sender": inviter_user_id, + "sender_display_name": inviter_display_name, + "sender_avatar_url": inviter_avatar_url, } ) # TODO: Check for success @@ -743,13 +807,17 @@ class RoomMemberHandler(BaseHandler): ) defer.returnValue((token, public_key, key_validity_url, display_name)) + def forget(self, user, room_id): + return self.store.forget(user.to_string(), room_id) + class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): chunk = yield self.store.get_rooms(is_public=True) - results = yield defer.gatherResults( + + room_members = yield defer.gatherResults( [ self.store.get_users_in_room(room["room_id"]) for room in chunk @@ -757,12 +825,30 @@ class RoomListHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) + avatar_urls = yield defer.gatherResults( + [ + self.get_room_avatar_url(room["room_id"]) + for room in chunk + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + for i, room in enumerate(chunk): - room["num_joined_members"] = len(results[i]) + room["num_joined_members"] = len(room_members[i]) + if avatar_urls[i]: + room["avatar_url"] = avatar_urls[i] # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) + @defer.inlineCallbacks + def get_room_avatar_url(self, room_id): + event = yield self.hs.get_state_handler().get_current_state( + room_id, "m.room.avatar" + ) + if event and "url" in event.content: + defer.returnValue(event.content["url"]) + class RoomContextHandler(BaseHandler): @defer.inlineCallbacks diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 50688e51a8..99ef56871c 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -131,6 +131,17 @@ class SearchHandler(BaseHandler): if batch_group == "room_id": room_ids.intersection_update({batch_group_key}) + if not room_ids: + defer.returnValue({ + "search_categories": { + "room_events": { + "results": [], + "count": 0, + "highlights": [], + } + } + }) + rank_map = {} # event_id -> rank of event allowed_events = [] room_groups = {} # Holds result of grouping by room, if applicable @@ -139,11 +150,22 @@ class SearchHandler(BaseHandler): # Holds the next_batch for the entire result set if one of those exists global_next_batch = None + highlights = set() + + count = None + if order_by == "rank": - results = yield self.store.search_msgs( + search_result = yield self.store.search_msgs( room_ids, search_term, keys ) + count = search_result["count"] + + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + results_map = {r["event"].event_id: r for r in results} rank_map.update({r["event"].event_id: r["rank"] for r in results}) @@ -171,80 +193,78 @@ class SearchHandler(BaseHandler): s["results"].append(e.event_id) elif order_by == "recent": - # In this case we specifically loop through each room as the given - # limit applies to each room, rather than a global list. - # This is not necessarilly a good idea. - for room_id in room_ids: - room_events = [] - if batch_group == "room_id" and batch_group_key == room_id: - pagination_token = batch_token - else: - pagination_token = None - i = 0 - - # We keep looping and we keep filtering until we reach the limit - # or we run out of things. - # But only go around 5 times since otherwise synapse will be sad. - while len(room_events) < search_filter.limit() and i < 5: - i += 1 - results = yield self.store.search_room( - room_id, search_term, keys, search_filter.limit() * 2, - pagination_token=pagination_token, - ) + room_events = [] + i = 0 + + pagination_token = batch_token + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit() and i < 5: + i += 1 + search_result = yield self.store.search_rooms( + room_ids, search_term, keys, search_filter.limit() * 2, + pagination_token=pagination_token, + ) - results_map = {r["event"].event_id: r for r in results} + if search_result["highlights"]: + highlights.update(search_result["highlights"]) - rank_map.update({r["event"].event_id: r["rank"] for r in results}) + count = search_result["count"] - filtered_events = search_filter.filter([ - r["event"] for r in results - ]) + results = search_result["results"] - events = yield self._filter_events_for_client( - user.to_string(), filtered_events - ) + results_map = {r["event"].event_id: r for r in results} - room_events.extend(events) - room_events = room_events[:search_filter.limit()] + rank_map.update({r["event"].event_id: r["rank"] for r in results}) - if len(results) < search_filter.limit() * 2: - pagination_token = None - break - else: - pagination_token = results[-1]["pagination_token"] - - if room_events: - res = results_map[room_events[-1].event_id] - pagination_token = res["pagination_token"] - - group = room_groups.setdefault(room_id, {}) - if pagination_token: - next_batch = encode_base64("%s\n%s\n%s" % ( - "room_id", room_id, pagination_token - )) - group["next_batch"] = next_batch - - if batch_token: - global_next_batch = next_batch - - group["results"] = [e.event_id for e in room_events] - group["order"] = max( - e.origin_server_ts/1000 for e in room_events - if hasattr(e, "origin_server_ts") - ) + filtered_events = search_filter.filter([ + r["event"] for r in results + ]) + + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) - allowed_events.extend(room_events) + room_events.extend(events) + room_events = room_events[:search_filter.limit()] - # Normalize the group orders - if room_groups: - if len(room_groups) > 1: - mx = max(g["order"] for g in room_groups.values()) - mn = min(g["order"] for g in room_groups.values()) + if len(results) < search_filter.limit() * 2: + pagination_token = None + break + else: + pagination_token = results[-1]["pagination_token"] - for g in room_groups.values(): - g["order"] = (g["order"] - mn) * 1.0 / (mx - mn) + for event in room_events: + group = room_groups.setdefault(event.room_id, { + "results": [], + }) + group["results"].append(event.event_id) + + if room_events and len(room_events) >= search_filter.limit(): + last_event_id = room_events[-1].event_id + pagination_token = results_map[last_event_id]["pagination_token"] + + # We want to respect the given batch group and group keys so + # that if people blindly use the top level `next_batch` token + # it returns more from the same group (if applicable) rather + # than reverting to searching all results again. + if batch_group and batch_group_key: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + batch_group, batch_group_key, pagination_token + )) else: - room_groups.values()[0]["order"] = 1 + global_next_batch = encode_base64("%s\n%s\n%s" % ( + "all", "", pagination_token + )) + + for room_id, group in room_groups.items(): + group["next_batch"] = encode_base64("%s\n%s\n%s" % ( + "room_id", room_id, pagination_token + )) + + allowed_events.extend(room_events) else: # We should never get here due to the guard earlier. @@ -334,20 +354,19 @@ class SearchHandler(BaseHandler): # We're now about to serialize the events. We should not make any # blocking calls after this. Otherwise the 'age' will be wrong - results = { - e.event_id: { + results = [ + { "rank": rank_map[e.event_id], "result": serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } for e in allowed_events - } - - logger.info("Found %d results", len(results)) + ] rooms_cat_res = { "results": results, - "count": len(results) + "count": count, + "highlights": list(highlights), } if state_results: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 877328b29e..feea407ea2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -15,8 +15,9 @@ from ._base import BaseHandler -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes +from synapse.api.errors import GuestAccessError +from synapse.util import unwrapFirstError from twisted.internet import defer @@ -28,6 +29,7 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", + "is_guest", "filter", ]) @@ -100,6 +102,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. + "account_data", # List of account_data events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. @@ -115,6 +118,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.presence or self.joined or self.invited ) +GuestRoom = collections.namedtuple("GuestRoom", ("room_id", "membership")) + class SyncHandler(BaseHandler): @@ -133,6 +138,18 @@ class SyncHandler(BaseHandler): A Deferred SyncResult. """ + if sync_config.is_guest: + bad_rooms = [] + for room_id in sync_config.filter.list_rooms(): + world_readable = yield self._is_world_readable(room_id) + if not world_readable: + bad_rooms.append(room_id) + + if bad_rooms: + raise GuestAccessError( + bad_rooms, 403, "Guest access not allowed" + ) + if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. @@ -149,6 +166,17 @@ class SyncHandler(BaseHandler): ) defer.returnValue(result) + @defer.inlineCallbacks + def _is_world_readable(self, room_id): + state = yield self.hs.get_state_handler().get_current_state( + room_id, + EventTypes.RoomHistoryVisibility + ) + if state and "history_visibility" in state.content: + defer.returnValue(state.content["history_visibility"] == "world_readable") + else: + defer.returnValue(False) + def current_sync_for_user(self, sync_config, since_token=None, full_state=False): """Get the sync for client needed to match what the server has now. @@ -172,47 +200,71 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) + if sync_config.is_guest: + room_list = [ + GuestRoom(room_id, Membership.JOIN) + for room_id in sync_config.filter.list_rooms() + ] + + account_data = {} + account_data_by_room = {} + tags_by_room = {} + + else: + membership_list = (Membership.INVITE, Membership.JOIN) + if sync_config.filter.include_leave: + membership_list += (Membership.LEAVE, Membership.BAN) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=sync_config.user.to_string(), + membership_list=membership_list + ) + + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + tags_by_room = yield self.store.get_tags_for_user( + sync_config.user.to_string() + ) presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( + + joined_room_ids = [ + room.room_id for room in room_list + if room.membership == Membership.JOIN + ] + + presence, _ = yield presence_stream.get_new_events( + from_key=0, user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=( - Membership.INVITE, - Membership.JOIN, - Membership.LEAVE, - Membership.BAN - ) + room_ids=joined_room_ids, + is_guest=sync_config.is_guest, ) - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token, joined_room_ids ) joined = [] invited = [] archived = [] + deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.full_state_sync_for_joined_room( + room_sync_deferred = self.full_state_sync_for_joined_room( room_id=event.room_id, sync_config=sync_config, now_token=now_token, timeline_since_token=timeline_since_token, ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) - joined.append(room_sync) + room_sync_deferred.addCallback(joined.append) + deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: invite = yield self.store.get_event(event.event_id) invited.append(InvitedSyncResult( @@ -223,18 +275,25 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync = yield self.full_state_sync_for_archived_room( + room_sync_deferred = self.full_state_sync_for_archived_room( sync_config=sync_config, room_id=event.room_id, leave_event_id=event.event_id, leave_token=leave_token, timeline_since_token=timeline_since_token, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) - archived.append(room_sync) + room_sync_deferred.addCallback(archived.append) + deferreds.append(room_sync_deferred) + + yield defer.gatherResults( + deferreds, consumeErrors=True + ).addErrback(unwrapFirstError) defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -244,7 +303,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -262,26 +322,47 @@ class SyncHandler(BaseHandler): state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) - def account_data_for_room(self, room_id, tags_by_room): - account_data = [] + def account_data_for_user(self, account_data): + account_data_events = [] + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events + + def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): + account_data_events = [] tags = tags_by_room.get(room_id) if tags is not None: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - return account_data + + account_data = account_data_by_room.get(room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events @defer.inlineCallbacks - def ephemeral_by_room(self, sync_config, now_token, since_token=None): + def ephemeral_by_room(self, sync_config, now_token, room_ids, + since_token=None): """Get the ephemeral events for each room the user is in Args: sync_config (SyncConfig): The flags, filters and user for the sync. now_token (StreamToken): Where the server is currently up to. + room_ids (list): List of room id strings to get data for. since_token (StreamToken): Where the server was when the client last synced. Returns: @@ -292,9 +373,6 @@ class SyncHandler(BaseHandler): typing_key = since_token.typing_key if since_token else "0" - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( user=sync_config.user, @@ -341,7 +419,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token, tags_by_room): + timeline_since_token, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -358,7 +437,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=leave_state, account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) @@ -371,8 +450,38 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] + if sync_config.is_guest: + room_ids = sync_config.filter.list_rooms() + + tags_by_room = {} + account_data = {} + account_data_by_room = {} + + else: + rooms = yield self.store.get_rooms_for_user( + sync_config.user.to_string() + ) + room_ids = [room.room_id for room in rooms] + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token, since_token + ) + + tags_by_room = yield self.store.get_updated_tags( + sync_config.user.to_string(), + since_token.account_data_key, + ) + + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + sync_config.user.to_string(), + since_token.account_data_key, + ) + ) + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token, room_ids, since_token + ) presence_source = self.event_sources.sources["presence"] presence, presence_key = yield presence_source.get_new_events( @@ -380,15 +489,10 @@ class SyncHandler(BaseHandler): from_key=since_token.presence_key, limit=sync_config.filter.presence_limit(), room_ids=room_ids, - # /sync doesn't support guest access, they can't get to this point in code - is_guest=False, + is_guest=sync_config.is_guest, ) now_token = now_token.copy_and_replace("presence_key", presence_key) - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - rm_handler = self.hs.get_handlers().room_member_handler app_service = yield self.store.get_app_service_by_user_id( sync_config.user.to_string() @@ -408,11 +512,8 @@ class SyncHandler(BaseHandler): from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, - ) - - tags_by_room = yield self.store.get_updated_tags( - sync_config.user.to_string(), - since_token.account_data_key, + room_ids=room_ids if sync_config.is_guest else (), + is_guest=sync_config.is_guest, ) joined = [] @@ -469,7 +570,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -492,14 +593,15 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room + ephemeral_by_room, tags_by_room, account_data_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room + sync_config, leave_event, since_token, tags_by_room, + account_data_by_room ) archived.append(room_sync) @@ -510,6 +612,7 @@ class SyncHandler(BaseHandler): defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -542,7 +645,10 @@ class SyncHandler(BaseHandler): end_key = "s" + room_key.split('-')[-1] loaded_recents = sync_config.filter.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( - sync_config.user.to_string(), loaded_recents, + sync_config.user.to_string(), + loaded_recents, + is_guest=sync_config.is_guest, + require_all_visible_for_guests=False ) loaded_recents.extend(recents) recents = loaded_recents @@ -566,7 +672,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -606,7 +713,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) @@ -616,7 +723,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token, tags_by_room): + since_token, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -654,7 +762,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=state_events_delta, account_data=self.account_data_for_room( - leave_event.room_id, tags_by_room + leave_event.room_id, tags_by_room, account_data_by_room ), ) |