diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-27 14:13:06 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-27 14:13:06 +0100 |
commit | 47519cd8c27c343405431c206660ba74fdea52f6 (patch) | |
tree | 77b7a55778c42e256b99577226a6172f719e8416 /synapse/handlers | |
parent | Implement presence event source. Change the way the notifier indexes listeners (diff) | |
parent | fix joining rooms on webclient (diff) | |
download | synapse-47519cd8c27c343405431c206660ba74fdea52f6.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor
Conflicts: synapse/handlers/events.py synapse/rest/events.py synapse/rest/room.py
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 5 | ||||
-rw-r--r-- | synapse/handlers/events.py | 99 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 146 |
3 files changed, 227 insertions, 23 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8a4aa6e5d6..b645977767 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,12 +17,13 @@ from .register import RegistrationHandler from .room import ( MessageHandler, RoomCreationHandler, RoomMemberHandler, RoomListHandler ) -from .events import EventStreamHandler +from .events import EventStreamHandler, EventHandler from .federation import FederationHandler from .login import LoginHandler from .profile import ProfileHandler from .presence import PresenceHandler from .directory import DirectoryHandler +from .typing import TypingNotificationHandler class Handlers(object): @@ -39,9 +40,11 @@ class Handlers(object): self.room_creation_handler = RoomCreationHandler(hs) self.room_member_handler = RoomMemberHandler(hs) self.event_stream_handler = EventStreamHandler(hs) + self.event_handler = EventHandler(hs) self.federation_handler = FederationHandler(hs) self.profile_handler = ProfileHandler(hs) self.presence_handler = PresenceHandler(hs) self.room_list_handler = RoomListHandler(hs) self.login_handler = LoginHandler(hs) self.directory_handler = DirectoryHandler(hs) + self.typing_notification_handler = TypingNotificationHandler(hs) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index aabec37fc0..b336b292d3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -47,26 +47,81 @@ class EventStreamHandler(BaseHandler): def get_stream(self, auth_user_id, pagin_config, timeout=0): auth_user = self.hs.parse_userid(auth_user_id) - if pagin_config.from_token is None: - pagin_config.from_token = None - - rm_handler = self.hs.get_handlers().room_member_handler - room_ids = yield rm_handler.get_rooms_for_user(auth_user) - - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) - - chunks = [ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in events - ] - - chunk = { - "chunk": chunks, - "start": tokens[0].to_string(), - "end": tokens[1].to_string(), - } - - defer.returnValue(chunk) + try: + if auth_user not in self._streams_per_user: + self._streams_per_user[auth_user] = 0 + if auth_user in self._stop_timer_per_user: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user)) + else: + self.distributor.fire( + "started_user_eventstream", auth_user + ) + self._streams_per_user[auth_user] += 1 + + + if pagin_config.from_token is None: + pagin_config.from_token = None + + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) + + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + + chunk = { + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), + } + + defer.returnValue(chunk) + + finally: + self._streams_per_user[auth_user] -= 1 + if not self._streams_per_user[auth_user]: + del self._streams_per_user[auth_user] + + # 10 seconds of grace to allow the client to reconnect again + # before we think they're gone + def _later(): + self.distributor.fire( + "stopped_user_eventstream", auth_user + ) + del self._stop_timer_per_user[auth_user] + + self._stop_timer_per_user[auth_user] = ( + self.clock.call_later(5, _later) + ) + + +class EventHandler(BaseHandler): + @defer.inlineCallbacks + def get_event(self, user, event_id): + """Retrieve a single specified event. + + Args: + user (synapse.types.UserID): The user requesting the event + event_id (str): The event ID to obtain. + Returns: + dict: An event, or None if there is no event matching this ID. + Raises: + SynapseError if there was a problem retrieving this event, or + AuthError if the user does not have the rights to inspect this + event. + """ + event = yield self.store.get_event(event_id) + + if not event: + defer.returnValue(None) + return + + yield self.auth.check(event, raises=True) + defer.returnValue(event) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py new file mode 100644 index 0000000000..9d38a7336e --- /dev/null +++ b/synapse/handlers/typing.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import BaseHandler + +import logging + +from collections import namedtuple + + +logger = logging.getLogger(__name__) + + +# A tiny object useful for storing a user's membership in a room, as a mapping +# key +RoomMember = namedtuple("RoomMember", ("room_id", "user")) + + +class TypingNotificationHandler(BaseHandler): + def __init__(self, hs): + super(TypingNotificationHandler, self).__init__(hs) + + self.homeserver = hs + + self.clock = hs.get_clock() + + self.federation = hs.get_replication_layer() + + self.federation.register_edu_handler("m.typing", self._recv_edu) + + self._member_typing_until = {} + + @defer.inlineCallbacks + def started_typing(self, target_user, auth_user, room_id, timeout): + if not target_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + if target_user != auth_user: + raise AuthError(400, "Cannot set another user's typing state") + + until = self.clock.time_msec() + timeout + member = RoomMember(room_id=room_id, user=target_user) + + was_present = member in self._member_typing_until + + self._member_typing_until[member] = until + + if was_present: + # No point sending another notification + defer.returnValue(None) + + yield self._push_update( + room_id=room_id, + user=target_user, + typing=True, + ) + + @defer.inlineCallbacks + def stopped_typing(self, target_user, auth_user, room_id): + if not target_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + if target_user != auth_user: + raise AuthError(400, "Cannot set another user's typing state") + + member = RoomMember(room_id=room_id, user=target_user) + + if member not in self._member_typing_until: + # No point + defer.returnValue(None) + + yield self._push_update( + room_id=room_id, + user=target_user, + typing=False, + ) + + @defer.inlineCallbacks + def _push_update(self, room_id, user, typing): + localusers = set() + remotedomains = set() + + rm_handler = self.homeserver.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into(room_id, + localusers=localusers, remotedomains=remotedomains, + ignore_user=user) + + for u in localusers: + self.push_update_to_clients( + room_id=room_id, + observer_user=u, + observed_user=user, + typing=typing, + ) + + deferreds = [] + for domain in remotedomains: + deferreds.append(self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": room_id, + "user_id": user.to_string(), + "typing": typing, + }, + )) + + yield defer.DeferredList(deferreds, consumeErrors=False) + + @defer.inlineCallbacks + def _recv_edu(self, origin, content): + room_id = content["room_id"] + user = self.homeserver.parse_userid(content["user_id"]) + + localusers = set() + + rm_handler = self.homeserver.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into(room_id, + localusers=localusers) + + for u in localusers: + self.push_update_to_clients( + room_id=room_id, + observer_user=u, + observed_user=user, + typing=content["typing"] + ) + + def push_update_to_clients(self, room_id, observer_user, observed_user, + typing): + # TODO(paul) steal this from presence.py + pass |