diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 1 | ||||
-rw-r--r-- | synapse/handlers/events.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 124 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 5 | ||||
-rw-r--r-- | synapse/handlers/room.py | 271 |
5 files changed, 228 insertions, 181 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index c2f4685c92..3f07b5aa4a 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -24,4 +24,5 @@ class BaseHandler(object): self.notifier = hs.get_notifier() self.room_lock = hs.get_room_lock_manager() self.state_handler = hs.get_state_handler() + self.distributor = hs.get_distributor() self.hs = hs diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3af7d824a2..6bb797caf2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,8 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.streams.event import ( - EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, - RoomDataStreamData + EventStream, EventsStreamData ) from synapse.handlers.presence import PresenceStreamData @@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData class EventStreamHandler(BaseHandler): stream_data_classes = [ - MessagesStreamData, - RoomMemberStreamData, - FeedbackStreamData, - RoomDataStreamData, + EventsStreamData, PresenceStreamData, ] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a2..9cff444779 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,10 +32,19 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): """Handles events that originated from federation.""" + def __init__(self, hs): + super(FederationHandler, self).__init__(hs) + + self.distributor.observe( + "user_joined_room", + self._on_user_joined + ) + + self.waiting_for_join_list = {} @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +79,115 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) + + room = yield self.store.get_room(event.room_id) + + if not room: + # Huh, let's try and get the current state + try: + federation = self.hs.get_federation() + yield federation.get_state_for_room( + event.origin, event.room_id + ) + + hosts = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + if self.hs.hostname in hosts: + try: + yield self.store.store_room( + event.room_id, + "", + is_public=False + ) + except: + pass + except: + logger.exception( + "Failed to get current state for room %s", + event.room_id + ) + + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + if event.type == RoomMemberEvent.TYPE: + if event.membership == Membership.JOIN: + user = self.hs.parse_userid(event.target_user_id) + self.distributor.fire( + "user_joined_room", user=user, room_id=event.room_id + ) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.exception("Failed to persist event: %s", event) + + defer.returnValue(events) - yield self.notifier.on_new_room_event(event, store_id) + @log_function + @defer.inlineCallbacks + def do_invite_join(self, target_host, room_id, joinee, content): + federation = self.hs.get_federation() + + hosts = yield self.store.get_joined_hosts_for_room(room_id) + if self.hs.hostname in hosts: + # We are already in the room. + logger.debug("We're already in the room apparently") + defer.returnValue(False) + + # First get current state to see if we are already joined. + try: + yield federation.get_state_for_room(target_host, room_id) + + hosts = yield self.store.get_joined_hosts_for_room(room_id) + if self.hs.hostname in hosts: + # Oh, we were actually in the room already. + logger.debug("We're already in the room apparently") + defer.returnValue(False) + except Exception: + logger.exception("Failed to get current state") + + new_event = self.event_factory.create_event( + etype=InviteJoinEvent.TYPE, + target_host=target_host, + room_id=room_id, + user_id=joinee, + content=content + ) + + new_event.destinations = [target_host] + + yield federation.handle_new_event(new_event) + + # TODO (erikj): Time out here. + d = defer.Deferred() + self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) + yield d + + try: + yield self.store.store_room( + event.room_id, + "", + is_public=False + ) + except: + pass + + + defer.returnValue(True) + + + @log_function + def _on_user_joined(self, user, room_id): + waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) + while waiters: + waiters.pop().callback(None) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a2152c99cf..540e114b82 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -685,7 +685,10 @@ class PresenceStreamData(StreamData): super(PresenceStreamData, self).__init__(hs) self.presence = hs.get_handlers().presence_handler - def get_rows(self, user_id, from_key, to_key, limit): + def get_rows(self, user_id, from_key, to_key, limit, direction): + from_key = int(from_key) + to_key = int(to_key) + cachemap = self.presence._user_cachemap # TODO(paul): limit, and filter by visibility diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5d0379254b..899b653fb7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,8 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.api.streams.event import EventStream, EventsStreamData +from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -59,12 +60,14 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # Pull out the message from the db - msg = yield self.store.get_message(room_id=room_id, - msg_id=msg_id, - user_id=sender_id) +# msg = yield self.store.get_message( +# room_id=room_id, +# msg_id=msg_id, +# user_id=sender_id +# ) + + # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this. - if msg: - defer.returnValue(msg) defer.returnValue(None) @defer.inlineCallbacks @@ -114,8 +117,9 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [MessagesStreamData(self.hs, room_id=room_id, - feedback=feedback)] + data_source = [ + EventsStreamData(self.hs, room_id=room_id, feedback=feedback) + ] event_stream = EventStream(user_id, data_source) pagin_config = yield event_stream.fix_tokens(pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config) @@ -141,12 +145,7 @@ class MessageHandler(BaseHandler): yield self.state_handler.handle_new_event(event) # store in db - store_id = yield self.store.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + store_id = yield self.store.persist_event(event) event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id @@ -201,19 +200,17 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_room_data(room_id, event_type, state_key) + data = yield self.store.get_current_state( + room_id, event_type, state_key + ) defer.returnValue(data) @defer.inlineCallbacks - def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, - user_id=None, fb_sender_id=None, fb_type=None): - yield self.auth.check_joined_room(room_id, user_id) + def get_feedback(self, event_id): + # yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db - fb = yield self.store.get_feedback( - room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, fb_type=fb_type - ) + fb = yield self.store.get_feedback(event_id) if fb: defer.returnValue(fb) @@ -260,20 +257,59 @@ class MessageHandler(BaseHandler): user_id=user_id, membership_list=[Membership.INVITE, Membership.JOIN] ) - for room_info in room_list: - if room_info["membership"] != Membership.JOIN: + + rooms_ret = [] + + now_rooms_token = yield self.store.get_room_events_max_id() + + # FIXME (erikj): Fix this. + presence_stream = PresenceStreamData(self.hs) + now_presence_token = yield presence_stream.max_token() + presence = yield presence_stream.get_rows( + user_id, 0, now_presence_token, None, None + ) + + # FIXME (erikj): We need to not generate this token, + now_token = "%s_%s" % (now_rooms_token, now_presence_token) + + for event in room_list: + d = { + "room_id": event.room_id, + "membership": event.membership, + } + + if event.membership == Membership.INVITE: + d["inviter"] = event.user_id + + rooms_ret.append(d) + + if event.membership != Membership.JOIN: continue try: - event_chunk = yield self.get_messages( - user_id=user_id, - pagin_config=pagin_config, - feedback=feedback, - room_id=room_info["room_id"] + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=10, + end_token=now_rooms_token, ) - room_info["messages"] = event_chunk + + d["messages"] = { + "chunk": [m.get_dict() for m in messages], + "start": token[0], + "end": token[1], + } + + current_state = yield self.store.get_current_state(event.room_id) + d["state"] = [c.get_dict() for c in current_state] except: - pass - defer.returnValue(room_list) + logger.exception("Failed to get snapshot") + + user = self.hs.parse_userid(user_id) + + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} + + logger.debug("snapshot_all_rooms returning: %s", ret) + + defer.returnValue(ret) class RoomCreationHandler(BaseHandler): @@ -372,7 +408,6 @@ class RoomCreationHandler(BaseHandler): yield self.hs.get_handlers().room_member_handler.change_membership( join_event, - broadcast_msg=True, do_auth=False ) @@ -451,11 +486,11 @@ class RoomMemberHandler(BaseHandler): member_list = yield self.store.get_room_members(room_id=room_id) event_list = [ - entry.as_event(self.event_factory).get_dict() + entry.get_dict() for entry in member_list ] chunk_data = { - "start": "START", + "start": "START", # FIXME (erikj): START is no longer a valid value "end": "END", "chunk": event_list } @@ -484,29 +519,28 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(member) @defer.inlineCallbacks - def change_membership(self, event=None, broadcast_msg=False, do_auth=True): + def change_membership(self, event=None, do_auth=True): """ Change the membership status of a user in a room. Args: event (SynapseEvent): The membership event - broadcast_msg (bool): True to inject a membership message into this - room on success. Raises: SynapseError if there was a problem changing the membership. """ - #broadcast_msg = False - prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id ) - if prev_state and prev_state.membership == event.membership: - # treat this event as a NOOP. - if do_auth: # This is mainly to fix a unit test. - yield self.auth.check(event, raises=True) - defer.returnValue({}) - return + if prev_state: + event.content["prev"] = prev_state.membership + +# if prev_state and prev_state.membership == event.membership: +# # treat this event as a NOOP. +# if do_auth: # This is mainly to fix a unit test. +# yield self.auth.check(event, raises=True) +# defer.returnValue({}) +# return room_id = event.room_id @@ -514,9 +548,7 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: - yield self._do_join( - event, do_auth=do_auth, broadcast_msg=broadcast_msg - ) + yield self._do_join(event, do_auth=do_auth) else: # This is not a JOIN, so we can handle it normally. if do_auth: @@ -534,7 +566,6 @@ class RoomMemberHandler(BaseHandler): yield self._do_local_membership_update( event, membership=event.content["membership"], - broadcast_msg=broadcast_msg, ) defer.returnValue({"room_id": room_id}) @@ -569,14 +600,14 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id # If event doesn't include a display name, add one. - yield self._fill_out_join_content( - joinee, event.content + yield self.distributor.fire( + "collect_presencelike_data", joinee, event.content ) # XXX: We don't do an auth check if we are doing an invite @@ -584,9 +615,9 @@ class RoomMemberHandler(BaseHandler): # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - room = yield self.store.get_room(room_id) + hosts = yield self.store.get_joined_hosts_for_room(room_id) - if room: + if self.hs.hostname in hosts: should_do_dance = False elif room_host: should_do_dance = True @@ -598,7 +629,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.INVITE: room = yield self.store.get_room(room_id) inviter = UserID.from_string( - prev_state.sender, self.hs + prev_state.user_id, self.hs ) should_do_dance = not inviter.is_mine and not room @@ -606,8 +637,15 @@ class RoomMemberHandler(BaseHandler): else: should_do_dance = False + have_joined = False + if should_do_dance: + handler = self.hs.get_handlers().federation_handler + have_joined = yield handler.do_invite_join( + room_host, room_id, event.user_id, event.content + ) + # We want to do the _do_update inside the room lock. - if not should_do_dance: + if not have_joined: logger.debug("Doing normal join") if do_auth: @@ -617,16 +655,6 @@ class RoomMemberHandler(BaseHandler): yield self._do_local_membership_update( event, membership=event.content["membership"], - broadcast_msg=broadcast_msg, - ) - - - if should_do_dance: - yield self._do_invite_join_dance( - room_id=room_id, - joinee=event.user_id, - target_host=room_host, - content=event.content, ) user = self.hs.parse_userid(event.user_id) @@ -635,32 +663,6 @@ class RoomMemberHandler(BaseHandler): ) @defer.inlineCallbacks - def _fill_out_join_content(self, user_id, content): - # If event doesn't include a display name, add one. - profile_handler = self.hs.get_handlers().profile_handler - if "displayname" not in content: - try: - display_name = yield profile_handler.get_displayname( - user_id - ) - - if display_name: - content["displayname"] = display_name - except: - logger.exception("Failed to set display_name") - - if "avatar_url" not in content: - try: - avatar_url = yield profile_handler.get_avatar_url( - user_id - ) - - if avatar_url: - content["avatar_url"] = avatar_url - except: - logger.exception("Failed to set display_name") - - @defer.inlineCallbacks def _should_invite_join(self, room_id, prev_state, do_auth): logger.debug("_should_invite_join: room_id: %s", room_id) @@ -694,18 +696,12 @@ class RoomMemberHandler(BaseHandler): user_id=user.to_string(), membership_list=membership_list ) - defer.returnValue([r["room_id"] for r in rooms]) + defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks - def _do_local_membership_update(self, event, membership, broadcast_msg): + def _do_local_membership_update(self, event, membership): # store membership - store_id = yield self.store.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=membership - ) + store_id = yield self.store.persist_event(event) # Send a PDU to all hosts who have joined the room. destinations = yield self.store.get_joined_hosts_for_room( @@ -732,78 +728,11 @@ class RoomMemberHandler(BaseHandler): yield self.hs.get_federation().handle_new_event(event) self.notifier.on_new_room_event(event, store_id) - if broadcast_msg: - yield self._inject_membership_msg( - source=event.user_id, - target=event.target_user_id, - room_id=event.room_id, - membership=event.content["membership"] - ) - - @defer.inlineCallbacks - def _do_invite_join_dance(self, room_id, joinee, target_host, content): - logger.debug("Doing remote join dance") - - # do invite join dance - federation = self.hs.get_federation() - new_event = self.event_factory.create_event( - etype=InviteJoinEvent.TYPE, - target_host=target_host, - room_id=room_id, - user_id=joinee, - content=content - ) - - new_event.destinations = [target_host] - - yield self.store.store_room( - room_id, "", is_public=False - ) - - #yield self.state_handler.handle_new_event(event) - yield federation.handle_new_event(new_event) - yield federation.get_state_for_room( - target_host, room_id - ) - - @defer.inlineCallbacks - def _inject_membership_msg(self, room_id=None, source=None, target=None, - membership=None): - # TODO this should be a different type of message, not m.text - if membership == Membership.INVITE: - body = "%s invited %s to the room." % (source, target) - elif membership == Membership.JOIN: - body = "%s joined the room." % (target) - elif membership == Membership.LEAVE: - body = "%s left the room." % (target) - else: - raise RoomError(500, "Unknown membership value %s" % membership) - - membership_json = { - "msgtype": u"m.text", - "body": body, - "membership_source": source, - "membership_target": target, - "membership": membership, - } - - msg_id = "m%s" % int(self.clock.time_msec()) - - event = self.event_factory.create_event( - etype=MessageEvent.TYPE, - room_id=room_id, - user_id="_homeserver_", - msg_id=msg_id, - content=membership_json - ) - - handler = self.hs.get_handlers().message_handler - yield handler.send_message(event, suppress_auth=True) - class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): - chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + chunk = yield self.store.get_rooms(is_public=True) + # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) |