diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
commit | 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch) | |
tree | 5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/api/notifier.py | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff) | |
download | synapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz |
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to 'synapse/api/notifier.py')
-rw-r--r-- | synapse/api/notifier.py | 196 |
1 files changed, 0 insertions, 196 deletions
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py deleted file mode 100644 index ec9c4e513d..0000000000 --- a/synapse/api/notifier.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent -from synapse.api.streams.event import EventsStreamData - -from twisted.internet import defer -from twisted.internet import reactor - -import logging - -logger = logging.getLogger(__name__) - - -class Notifier(object): - - def __init__(self, hs): - self.store = hs.get_datastore() - self.hs = hs - self.stored_event_listeners = {} - - @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): - """Called when there is a new room event which may potentially be sent - down listening users' event streams. - - This function looks for interested *users* who may want to be notified - for this event. This is different to users requesting from the event - stream which looks for interested *events* for this user. - - Args: - event (SynapseEvent): The new event, which must have a room_id - store_id (int): The ID of this event after it was stored with the - data store. - '""" - member_list = yield self.store.get_room_members(room_id=event.room_id, - membership="join") - if not member_list: - member_list = [] - - member_list = [u.user_id for u in member_list] - - # invites MUST prod the person being invited, who won't be in the room. - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.INVITE): - member_list.append(event.state_key) - # similarly, LEAVEs must be sent to the person leaving - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.LEAVE): - member_list.append(event.state_key) - - for user_id in member_list: - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event.get_dict(), - stream_type=EventsStreamData.EVENT_TYPE, - store_id=store_id) - - def on_new_user_event(self, user_id, event_data, stream_type, store_id): - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event_data, - stream_type=stream_type, - store_id=store_id - ) - - def _notify_and_callback(self, user_id, event_data, stream_type, store_id): - logger.debug( - "Notifying %s of a new event.", - user_id - ) - - stream_ids = list(self.stored_event_listeners[user_id]) - for stream_id in stream_ids: - self._notify_and_callback_stream(user_id, stream_id, event_data, - stream_type, store_id) - - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - - def _notify_and_callback_stream(self, user_id, stream_id, event_data, - stream_type, store_id): - - event_listener = self.stored_event_listeners[user_id].pop(stream_id) - return_event_object = { - k: event_listener[k] for k in ["start", "chunk", "end"] - } - - # work out the new end token - token = event_listener["start"] - end = self._next_token(stream_type, store_id, token) - return_event_object["end"] = end - - # add the event to the chunk - chunk = event_listener["chunk"] - chunk.append(event_data) - - # callback the defer. We know this can't have been resolved before as - # we always remove the event_listener from the map before resolving. - event_listener["defer"].callback(return_event_object) - - def _next_token(self, stream_type, store_id, current_token): - stream_handler = self.hs.get_handlers().event_stream_handler - return stream_handler.get_event_stream_token( - stream_type, - store_id, - current_token - ) - - def store_events_for(self, user_id=None, stream_id=None, from_tok=None): - """Store all incoming events for this user. This should be paired with - get_events_for to return chunked data. - - Args: - user_id (str): The user to monitor incoming events for. - stream (object): The stream that is receiving events - from_tok (str): The token to monitor incoming events from. - """ - event_listener = { - "start": from_tok, - "chunk": [], - "end": from_tok, - "defer": defer.Deferred(), - } - - if user_id not in self.stored_event_listeners: - self.stored_event_listeners[user_id] = {stream_id: event_listener} - else: - self.stored_event_listeners[user_id][stream_id] = event_listener - - def purge_events_for(self, user_id=None, stream_id=None): - """Purges any stored events for this user. - - Args: - user_id (str): The user to purge stored events for. - """ - try: - del self.stored_event_listeners[user_id][stream_id] - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - except KeyError: - pass - - def get_events_for(self, user_id=None, stream_id=None, timeout=0): - """Retrieve stored events for this user, waiting if necessary. - - It is advisable to wrap this call in a maybeDeferred. - - Args: - user_id (str): The user to get events for. - timeout (int): The time in seconds to wait before giving up. - Returns: - A Deferred or a dict containing the chunk data, depending on if - there was data to return yet. The Deferred callback may be None if - there were no events before the timeout expired. - """ - logger.debug("%s is listening for events.", user_id) - - try: - streams = self.stored_event_listeners[user_id][stream_id]["chunk"] - if streams: - logger.debug("%s returning existing chunk.", user_id) - return streams - except KeyError: - return None - - reactor.callLater( - (timeout / 1000.0), self._timeout, user_id, stream_id - ) - return self.stored_event_listeners[user_id][stream_id]["defer"] - - def _timeout(self, user_id, stream_id): - try: - # We remove the event_listener from the map so that we can't - # resolve the deferred twice. - event_listeners = self.stored_event_listeners[user_id] - event_listener = event_listeners.pop(stream_id) - event_listener["defer"].callback(None) - logger.debug("%s event listening timed out.", user_id) - except KeyError: - pass |