diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/notifier.py | 196 | ||||
-rw-r--r-- | synapse/api/streams/__init__.py | 103 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 194 | ||||
-rw-r--r-- | synapse/handlers/events.py | 106 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 39 | ||||
-rw-r--r-- | synapse/handlers/room.py | 79 | ||||
-rw-r--r-- | synapse/notifier.py | 236 | ||||
-rw-r--r-- | synapse/rest/events.py | 3 | ||||
-rw-r--r-- | synapse/rest/initial_sync.py | 2 | ||||
-rw-r--r-- | synapse/rest/room.py | 2 | ||||
-rw-r--r-- | synapse/server.py | 7 | ||||
-rw-r--r-- | synapse/storage/stream.py | 4 | ||||
-rw-r--r-- | synapse/streams/__init__.py | 14 | ||||
-rw-r--r-- | synapse/streams/config.py | 84 | ||||
-rw-r--r-- | synapse/streams/events.py | 180 | ||||
-rw-r--r-- | synapse/types.py | 33 |
17 files changed, 634 insertions, 650 deletions
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py deleted file mode 100644 index ec9c4e513d..0000000000 --- a/synapse/api/notifier.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent -from synapse.api.streams.event import EventsStreamData - -from twisted.internet import defer -from twisted.internet import reactor - -import logging - -logger = logging.getLogger(__name__) - - -class Notifier(object): - - def __init__(self, hs): - self.store = hs.get_datastore() - self.hs = hs - self.stored_event_listeners = {} - - @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): - """Called when there is a new room event which may potentially be sent - down listening users' event streams. - - This function looks for interested *users* who may want to be notified - for this event. This is different to users requesting from the event - stream which looks for interested *events* for this user. - - Args: - event (SynapseEvent): The new event, which must have a room_id - store_id (int): The ID of this event after it was stored with the - data store. - '""" - member_list = yield self.store.get_room_members(room_id=event.room_id, - membership="join") - if not member_list: - member_list = [] - - member_list = [u.user_id for u in member_list] - - # invites MUST prod the person being invited, who won't be in the room. - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.INVITE): - member_list.append(event.state_key) - # similarly, LEAVEs must be sent to the person leaving - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.LEAVE): - member_list.append(event.state_key) - - for user_id in member_list: - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event.get_dict(), - stream_type=EventsStreamData.EVENT_TYPE, - store_id=store_id) - - def on_new_user_event(self, user_id, event_data, stream_type, store_id): - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event_data, - stream_type=stream_type, - store_id=store_id - ) - - def _notify_and_callback(self, user_id, event_data, stream_type, store_id): - logger.debug( - "Notifying %s of a new event.", - user_id - ) - - stream_ids = list(self.stored_event_listeners[user_id]) - for stream_id in stream_ids: - self._notify_and_callback_stream(user_id, stream_id, event_data, - stream_type, store_id) - - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - - def _notify_and_callback_stream(self, user_id, stream_id, event_data, - stream_type, store_id): - - event_listener = self.stored_event_listeners[user_id].pop(stream_id) - return_event_object = { - k: event_listener[k] for k in ["start", "chunk", "end"] - } - - # work out the new end token - token = event_listener["start"] - end = self._next_token(stream_type, store_id, token) - return_event_object["end"] = end - - # add the event to the chunk - chunk = event_listener["chunk"] - chunk.append(event_data) - - # callback the defer. We know this can't have been resolved before as - # we always remove the event_listener from the map before resolving. - event_listener["defer"].callback(return_event_object) - - def _next_token(self, stream_type, store_id, current_token): - stream_handler = self.hs.get_handlers().event_stream_handler - return stream_handler.get_event_stream_token( - stream_type, - store_id, - current_token - ) - - def store_events_for(self, user_id=None, stream_id=None, from_tok=None): - """Store all incoming events for this user. This should be paired with - get_events_for to return chunked data. - - Args: - user_id (str): The user to monitor incoming events for. - stream (object): The stream that is receiving events - from_tok (str): The token to monitor incoming events from. - """ - event_listener = { - "start": from_tok, - "chunk": [], - "end": from_tok, - "defer": defer.Deferred(), - } - - if user_id not in self.stored_event_listeners: - self.stored_event_listeners[user_id] = {stream_id: event_listener} - else: - self.stored_event_listeners[user_id][stream_id] = event_listener - - def purge_events_for(self, user_id=None, stream_id=None): - """Purges any stored events for this user. - - Args: - user_id (str): The user to purge stored events for. - """ - try: - del self.stored_event_listeners[user_id][stream_id] - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - except KeyError: - pass - - def get_events_for(self, user_id=None, stream_id=None, timeout=0): - """Retrieve stored events for this user, waiting if necessary. - - It is advisable to wrap this call in a maybeDeferred. - - Args: - user_id (str): The user to get events for. - timeout (int): The time in seconds to wait before giving up. - Returns: - A Deferred or a dict containing the chunk data, depending on if - there was data to return yet. The Deferred callback may be None if - there were no events before the timeout expired. - """ - logger.debug("%s is listening for events.", user_id) - - try: - streams = self.stored_event_listeners[user_id][stream_id]["chunk"] - if streams: - logger.debug("%s returning existing chunk.", user_id) - return streams - except KeyError: - return None - - reactor.callLater( - (timeout / 1000.0), self._timeout, user_id, stream_id - ) - return self.stored_event_listeners[user_id][stream_id]["defer"] - - def _timeout(self, user_id, stream_id): - try: - # We remove the event_listener from the map so that we can't - # resolve the deferred twice. - event_listeners = self.stored_event_listeners[user_id] - event_listener = event_listeners.pop(stream_id) - event_listener["defer"].callback(None) - logger.debug("%s event listening timed out.", user_id) - except KeyError: - pass diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py deleted file mode 100644 index d831eafbab..0000000000 --- a/synapse/api/streams/__init__.py +++ /dev/null @@ -1,103 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.errors import SynapseError - - -class PaginationConfig(object): - - """A configuration object which stores pagination parameters.""" - - def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = from_tok - self.to_tok = to_tok - self.direction = direction - self.limit = limit - - @classmethod - def from_request(cls, request, raise_invalid_params=True): - params = { - "from_tok": "END", - "direction": 'f', - } - - query_param_mappings = [ # 3-tuple of qp_key, attribute, rules - ("from", "from_tok", lambda x: type(x) == str), - ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()), - ("dir", "direction", lambda x: x == 'f' or x == 'b'), - ] - - for qp, attr, is_valid in query_param_mappings: - if qp in request.args: - if is_valid(request.args[qp][0]): - params[attr] = request.args[qp][0] - elif raise_invalid_params: - raise SynapseError(400, "%s parameter is invalid." % qp) - - return PaginationConfig(**params) - - def __str__(self): - return ( - "<PaginationConfig from_tok=%s, to_tok=%s, " - "direction=%s, limit=%s>" - ) % (self.from_tok, self.to_tok, self.direction, self.limit) - - -class PaginationStream(object): - - """ An interface for streaming data as chunks. """ - - TOK_END = "END" - - def get_chunk(self, config=None): - """ Return the next chunk in the stream. - - Args: - config (PaginationConfig): The config to aid which chunk to get. - Returns: - A dict containing the new start token "start", the new end token - "end" and the data "chunk" as a list. - """ - raise NotImplementedError() - - -class StreamData(object): - - """ An interface for obtaining streaming data from a table. """ - - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - - def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): - """ Get event stream data between the specified pkeys. - - Args: - user_id : The user's ID - from_pkey : The starting pkey. - to_pkey : The end pkey. May be -1 to mean "latest". - limit: The max number of results to return. - Returns: - A tuple containing the list of event stream data and the last pkey. - """ - raise NotImplementedError() - - def max_token(self): - """ Get the latest currently-valid token. - - Returns: - The latest token.""" - raise NotImplementedError() diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py deleted file mode 100644 index fe44a488bc..0000000000 --- a/synapse/api/streams/event.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains classes for streaming from the event stream: /events. -""" -from twisted.internet import defer - -from synapse.api.errors import EventStreamError -from synapse.api.events import SynapseEvent -from synapse.api.streams import PaginationStream, StreamData - -import logging - -logger = logging.getLogger(__name__) - - -class EventsStreamData(StreamData): - EVENT_TYPE = "EventsStream" - - def __init__(self, hs, room_id=None, feedback=False): - super(EventsStreamData, self).__init__(hs) - self.room_id = room_id - self.with_feedback = feedback - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit, direction): - data, latest_ver = yield self.store.get_room_events( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id, - with_feedback=self.with_feedback - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_room_events_max_id() - defer.returnValue(val) - - -class EventStream(PaginationStream): - - SEPARATOR = '_' - - def __init__(self, user_id, stream_data_list): - super(EventStream, self).__init__() - self.user_id = user_id - self.stream_data = stream_data_list - - @defer.inlineCallbacks - def fix_tokens(self, pagination_config): - pagination_config.from_tok = yield self.fix_token( - pagination_config.from_tok) - pagination_config.to_tok = yield self.fix_token( - pagination_config.to_tok) - - if ( - not pagination_config.to_tok - and pagination_config.direction == 'f' - ): - pagination_config.to_tok = yield self.get_current_max_token() - - logger.debug("pagination_config: %s", pagination_config) - - defer.returnValue(pagination_config) - - @defer.inlineCallbacks - def fix_token(self, token): - """Fixes unknown values in a token to known values. - - Args: - token (str): The token to fix up. - Returns: - The fixed-up token, which may == token. - """ - if token == PaginationStream.TOK_END: - new_token = yield self.get_current_max_token() - - logger.debug("fix_token: From %s to %s", token, new_token) - - token = new_token - - defer.returnValue(token) - - @defer.inlineCallbacks - def get_current_max_token(self): - new_token_parts = [] - for s in self.stream_data: - mx = yield s.max_token() - new_token_parts.append(str(mx)) - - new_token = EventStream.SEPARATOR.join(new_token_parts) - - logger.debug("get_current_max_token: %s", new_token) - - defer.returnValue(new_token) - - @defer.inlineCallbacks - def get_chunk(self, config): - # no support for limit on >1 streams, makes no sense. - if config.limit and len(self.stream_data) > 1: - raise EventStreamError( - 400, "Limit not supported on multiplexed streams." - ) - - chunk_data, next_tok = yield self._get_chunk_data( - config.from_tok, - config.to_tok, - config.limit, - config.direction, - ) - - defer.returnValue({ - "chunk": chunk_data, - "start": config.from_tok, - "end": next_tok - }) - - @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit, direction): - """ Get event data between the two tokens. - - Tokens are SEPARATOR separated values representing pkey values of - certain tables, and the position determines the StreamData invoked - according to the STREAM_DATA list. - - The magic value '-1' can be used to get the latest value. - - Args: - from_tok - The token to start from. - to_tok - The token to end at. Must have values > from_tok or be -1. - Returns: - A list of event data. - Raises: - EventStreamError if something went wrong. - """ - # sanity check - if to_tok is not None: - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") - - chunk = [] - next_ver = [] - for i, (from_pkey, to_pkey) in enumerate(zip( - self._split_token(from_tok), - self._split_token(to_tok) - )): - if from_pkey == to_pkey: - # tokens are the same, we have nothing to do. - next_ver.append(str(to_pkey)) - continue - - (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit, direction, - ) - - chunk.extend([ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in event_chunk - ]) - next_ver.append(str(max_pkey)) - - defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) - - def _split_token(self, token): - """Splits the given token into a list of pkeys. - - Args: - token (str): The token with SEPARATOR values. - Returns: - A list of ints. - """ - if token: - segments = token.split(EventStream.SEPARATOR) - else: - segments = [None] * len(self.stream_data) - return segments diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1bd173acd8..e08231406d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,19 +15,17 @@ from twisted.internet import defer +from synapse.api.events import SynapseEvent + from ._base import BaseHandler -from synapse.api.streams.event import ( - EventStream, EventsStreamData -) -from synapse.handlers.presence import PresenceStreamData +import logging -class EventStreamHandler(BaseHandler): - stream_data_classes = [ - EventsStreamData, - PresenceStreamData, - ] +logger = logging.getLogger(__name__) + + +class EventStreamHandler(BaseHandler): def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) @@ -43,45 +41,12 @@ class EventStreamHandler(BaseHandler): self.clock = hs.get_clock() - def get_event_stream_token(self, stream_type, store_id, start_token): - """Return the next token after this event. - - Args: - stream_type (str): The StreamData.EVENT_TYPE - store_id (int): The new storage ID assigned from the data store. - start_token (str): The token the user started with. - Returns: - str: The end token. - """ - for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): - if stream_cls.EVENT_TYPE == stream_type: - # this is the stream for this event, so replace this part of - # the token - store_ids = start_token.split(EventStream.SEPARATOR) - store_ids[i] = str(store_id) - return EventStream.SEPARATOR.join(store_ids) - raise RuntimeError("Didn't find a stream type %s" % stream_type) + self.notifier = hs.get_notifier() @defer.inlineCallbacks def get_stream(self, auth_user_id, pagin_config, timeout=0): - """Gets events as an event stream for this user. - - This function looks for interesting *events* for this user. This is - different from the notifier, which looks for interested *users* who may - want to know about a single event. - - Args: - auth_user_id (str): The user requesting their event stream. - pagin_config (synapse.api.streams.PaginationConfig): The config to - use when obtaining the stream. - timeout (int): The max time to wait for an incoming event in ms. - Returns: - A pagination stream API dict - """ auth_user = self.hs.parse_userid(auth_user_id) - stream_id = object() - try: if auth_user not in self._streams_per_user: self._streams_per_user[auth_user] = 0 @@ -94,41 +59,30 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - # construct an event stream with the correct data ordering - stream_data_list = [] - for stream_class in EventStreamHandler.stream_data_classes: - stream_data_list.append(stream_class(self.hs)) - event_stream = EventStream(auth_user_id, stream_data_list) - - # fix unknown tokens to known tokens - pagin_config = yield event_stream.fix_tokens(pagin_config) - - # register interest in receiving new events - self.notifier.store_events_for(user_id=auth_user_id, - stream_id=stream_id, - from_tok=pagin_config.from_tok) - - # see if we can grab a chunk now - data_chunk = yield event_stream.get_chunk(config=pagin_config) - - # if there are previous events, return those. If not, wait on the - # new events for 'timeout' seconds. - if len(data_chunk["chunk"]) == 0 and timeout != 0: - results = yield defer.maybeDeferred( - self.notifier.get_events_for, - user_id=auth_user_id, - stream_id=stream_id, - timeout=timeout - ) - if results: - defer.returnValue(results) + if pagin_config.from_token is None: + pagin_config.from_token = None - defer.returnValue(data_chunk) - finally: - # cleanup - self.notifier.purge_events_for(user_id=auth_user_id, - stream_id=stream_id) + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) + + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + + chunk = { + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), + } + + defer.returnValue(chunk) + + finally: self._streams_per_user[auth_user] -= 1 if not self._streams_per_user[auth_user]: del self._streams_per_user[auth_user] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..62edd5dbdc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -110,7 +110,7 @@ class FederationHandler(BaseHandler): ) if not backfilled: - yield self.notifier.on_new_room_event(event, store_id) + yield self.notifier.on_new_room_event(event) if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index be10162db5..c479908f61 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.api.streams import StreamData from ._base import BaseHandler @@ -677,46 +676,10 @@ class PresenceHandler(BaseHandler): statuscache.make_event(user=observed_user, clock=self.clock) self.notifier.on_new_user_event( - observer_user.to_string(), - event_data=statuscache.make_event( - user=observed_user, - clock=self.clock - ), - stream_type=PresenceStreamData, - store_id=statuscache.serial + [observer_user], ) -class PresenceStreamData(StreamData): - def __init__(self, hs): - super(PresenceStreamData, self).__init__(hs) - self.presence = hs.get_handlers().presence_handler - - def get_rows(self, user_id, from_key, to_key, limit, direction): - from_key = int(from_key) - to_key = int(to_key) - - cachemap = self.presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial <= to_key] - - if updates: - clock = self.presence.clock - - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) - else: - return (([], self.presence._user_cachemap_latest_serial)) - - def max_token(self): - return self.presence._user_cachemap_latest_serial - -PresenceStreamData.EVENT_TYPE = PresenceStreamData - - class UserPresenceCache(object): """Store an observed user's state and status message. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..760373344d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseHandler @@ -95,7 +94,7 @@ class MessageHandler(BaseHandler): event.room_id ) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) yield self.hs.get_federation().handle_new_event(event) @@ -115,13 +114,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources["room"] + + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() + + user = self.hs.parse_userid(user_id) + + events, next_token = yield data_source.get_pagination_rows( + user, pagin_config, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -148,7 +158,7 @@ class MessageHandler(BaseHandler): event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id ) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) yield self.hs.get_federation().handle_new_event(event) @@ -230,7 +240,7 @@ class MessageHandler(BaseHandler): ) yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, @@ -256,20 +266,18 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + now_token = yield self.hs.get_event_sources().get_current_token() - # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources["presence"] + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +299,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key, ) d["messages"] = { @@ -300,14 +308,18 @@ class MessageHandler(BaseHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -391,10 +403,8 @@ class RoomCreationHandler(BaseHandler): ) yield self.state_handler.handle_new_event(config_event) - # store_id = persist... yield self.hs.get_federation().handle_new_event(config_event) - # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -490,7 +500,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } @@ -712,23 +722,20 @@ class RoomMemberHandler(BaseHandler): # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) if membership == Membership.INVITE: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = target_user.domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = target_user.domain destinations.append(host) event.destinations = list(set(destinations)) yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event, extra_users=[target_user]) class RoomListHandler(BaseHandler): diff --git a/synapse/notifier.py b/synapse/notifier.py new file mode 100644 index 0000000000..1656717cd7 --- /dev/null +++ b/synapse/notifier.py @@ -0,0 +1,236 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +from synapse.util.logutils import log_function + +import logging + + +logger = logging.getLogger(__name__) + + +class _NotificationListener(object): + """ This represents a single client connection to the events stream. + + The events stream handler will have yielded to the deferred, so to + notify the handler it is sufficient to resolve the deferred. + + This listener will also keep track of which rooms it is listening in + so that it can remove itself from the indexes in the Notifier class. + """ + + def __init__(self, user, rooms, from_token, limit, timeout, deferred): + self.user = user + self.from_token = from_token + self.limit = limit + self.timeout = timeout + self.deferred = deferred + + self.rooms = rooms + + self.pending_notifications = [] + + def notify(self, notifier, events, start_token, end_token): + """ Inform whoever is listening about the new events. This will + also remove this listener from all the indexes in the Notifier + it knows about. + """ + + result = (events, (start_token, end_token)) + + try: + self.deferred.callback(result) + except defer.AlreadyCalledError: + pass + + for room in self.rooms: + lst = notifier.rooms_to_listeners.get(room, set()) + lst.discard(self) + + notifier.user_to_listeners.get(self.user, set()).discard(self) + + +class Notifier(object): + """ This class is responsible for notifying any listeners when there are + new events available for it. + + Primarily used from the /events stream. + """ + + def __init__(self, hs): + self.hs = hs + + self.rooms_to_listeners = {} + self.user_to_listeners = {} + + self.event_sources = hs.get_event_sources() + + hs.get_distributor().observe( + "user_joined_room", self._user_joined_room + ) + + @log_function + @defer.inlineCallbacks + def on_new_room_event(self, event, extra_users=[]): + """ Used by handlers to inform the notifier something has happened + in the room, room event wise. + + This triggers the notifier to wake up any listeners that are + listening to the room, and any listeners for the users in the + `extra_users` param. + """ + room_id = event.room_id + + source = self.event_sources.sources["room"] + + listeners = self.rooms_to_listeners.get(room_id, set()).copy() + + for user in extra_users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + logger.debug("on_new_room_event listeners %s", listeners) + + # TODO (erikj): Can we make this more efficient by hitting the + # db once? + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + @defer.inlineCallbacks + def on_new_user_event(self, users=[], rooms=[]): + """ Used to inform listeners that something has happend + presence/user event wise. + + Will wake up all listeners for the given users and rooms. + """ + source = self.event_sources.sources["presence"] + + listeners = set() + + for user in users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + for room in rooms: + listeners |= self.rooms_to_listeners.get(room, set()).copy() + + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + def get_events_for(self, user, rooms, pagination_config, timeout): + """ For the given user and rooms, return any new events for them. If + there are no new events wait for up to `timeout` milliseconds for any + new events to happen before returning. + """ + deferred = defer.Deferred() + + self._get_events( + deferred, user, rooms, pagination_config.from_token, + pagination_config.limit, timeout + ).addErrback(deferred.errback) + + return deferred + + @defer.inlineCallbacks + def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + if not from_token: + from_token = yield self.event_sources.get_current_token() + + listener = _NotificationListener( + user, + rooms, + from_token, + limit, + timeout, + deferred, + ) + + if timeout: + reactor.callLater(timeout/1000, self._timeout_listener, listener) + + self._register_with_keys(listener) + yield self._check_for_updates(listener) + + return + + def _timeout_listener(self, listener): + # TODO (erikj): We should probably set to_token to the current max + # rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) + + @log_function + def _register_with_keys(self, listener): + for room in listener.rooms: + s = self.rooms_to_listeners.setdefault(room, set()) + s.add(listener) + + self.user_to_listeners.setdefault(listener.user, set()).add(listener) + + @defer.inlineCallbacks + @log_function + def _check_for_updates(self, listener): + # TODO (erikj): We need to think about limits across multiple sources + events = [] + + from_token = listener.from_token + limit = listener.limit + + # TODO (erikj): DeferredList? + for source in self.event_sources.sources.values(): + stuff, new_token = yield source.get_new_events_for_user( + listener.user, + from_token, + limit, + ) + + events.extend(stuff) + + from_token = new_token + + end_token = from_token + + if events: + listener.notify(self, events, listener.from_token, end_token) + + defer.returnValue(listener) + + def _user_joined_room(self, user, room_id): + new_listeners = self.user_to_listeners.get(user, set()) + + listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners |= new_listeners diff --git a/synapse/rest/events.py b/synapse/rest/events.py index d89dfc193c..2e7563d14b 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig from synapse.rest.base import RestServlet, client_path_pattern @@ -41,6 +41,7 @@ class EventStreamRestServlet(RestServlet): chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, timeout=timeout) + defer.returnValue((200, chunk)) def on_OPTIONS(self, request): diff --git a/synapse/rest/initial_sync.py b/synapse/rest/initial_sync.py index ce7937a919..d18c4c0f60 100644 --- a/synapse/rest/initial_sync.py +++ b/synapse/rest/initial_sync.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.streams import PaginationConfig +from synapse.streams.config import PaginationConfig from base import RestServlet, client_path_pattern diff --git a/synapse/rest/room.py b/synapse/rest/room.py index ebe4e24432..a10b3b54f9 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -18,9 +18,9 @@ from twisted.internet import defer from base import RestServlet, client_path_pattern from synapse.api.errors import SynapseError, Codes +from synapse.streams.config import PaginationConfig from synapse.api.events.room import RoomMemberEvent from synapse.api.constants import Membership -from synapse.api.streams import PaginationConfig import json import logging diff --git a/synapse/server.py b/synapse/server.py index 24f3a88103..c29c61220d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,7 +22,7 @@ from synapse.federation import initialize_http_replication from synapse.federation.handler import FederationEventHandler from synapse.api.events.factory import EventFactory -from synapse.api.notifier import Notifier +from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers from synapse.rest import RestServletFactory @@ -32,6 +32,7 @@ from synapse.types import UserID, RoomAlias, RoomID from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.util.lockutils import LockManager +from synapse.streams.events import EventSources class BaseHomeServer(object): @@ -73,6 +74,7 @@ class BaseHomeServer(object): 'resource_for_federation', 'resource_for_web_client', 'resource_for_content_repo', + 'event_sources', ] def __init__(self, hostname, **kwargs): @@ -195,6 +197,9 @@ class HomeServer(BaseHomeServer): def build_distributor(self): return Distributor() + def build_event_sources(self): + return EventSources(self) + def register_servlets(self): """ Register all servlets associated with this HomeServer. """ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cae80563b4..6a22d5aead 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -174,7 +174,7 @@ class StreamStore(SQLBaseStore): "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " - "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { @@ -293,5 +293,5 @@ class StreamStore(SQLBaseStore): defer.returnValue("s1") return - key = res[0]["m"] + 1 + key = res[0]["m"] defer.returnValue("s%d" % (key,)) diff --git a/synapse/streams/__init__.py b/synapse/streams/__init__.py new file mode 100644 index 0000000000..fe8a073cd3 --- /dev/null +++ b/synapse/streams/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/streams/config.py b/synapse/streams/config.py new file mode 100644 index 0000000000..2434844d80 --- /dev/null +++ b/synapse/streams/config.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.api.errors import SynapseError +from synapse.types import StreamToken + +import logging + + +logger = logging.getLogger(__name__) + + +class PaginationConfig(object): + + """A configuration object which stores pagination parameters.""" + + def __init__(self, from_token=None, to_token=None, direction='f', + limit=0): + self.from_token = from_token + self.to_token = to_token + self.direction = 'f' if direction == 'f' else 'b' + self.limit = int(limit) + + @classmethod + def from_request(cls, request, raise_invalid_params=True): + def get_param(name, default=None): + lst = request.args.get(name, []) + if len(lst) > 1: + raise SynapseError( + 400, "%s must be specified only once" % (name,) + ) + elif len(lst) == 1: + return lst[0] + else: + return default + + direction = get_param("dir", 'f') + if direction not in ['f', 'b']: + raise SynapseError(400, "'dir' parameter is invalid.") + + from_tok = get_param("from") + to_tok = get_param("to") + + try: + if from_tok == "END": + from_tok = None # For backwards compat. + elif from_tok: + from_tok = StreamToken.from_string(from_tok) + except: + raise SynapseError(400, "'from' paramater is invalid") + + try: + if to_tok: + to_tok = StreamToken.from_string(to_tok) + except: + raise SynapseError(400, "'to' paramater is invalid") + + limit = get_param("limit", "0") + if not limit.isdigit(): + raise SynapseError(400, "'limit' parameter must be an integer.") + + try: + return PaginationConfig(from_tok, to_tok, direction, limit) + except: + logger.exception("Failed to create pagination config") + raise SynapseError(400, "Invalid request.") + + def __str__(self): + return ( + "<PaginationConfig from_tok=%s, to_tok=%s, " + "direction=%s, limit=%s>" + ) % (self.from_tok, self.to_tok, self.direction, self.limit) diff --git a/synapse/streams/events.py b/synapse/streams/events.py new file mode 100644 index 0000000000..2e6ea6ca26 --- /dev/null +++ b/synapse/streams/events.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import Membership +from synapse.types import StreamToken + + +class RoomEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_token, limit): + # We just ignore the key for now. + + to_key = yield self.get_current_token_part() + + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_token.events_key, + to_key=to_key, + room_id=None, + limit=limit, + ) + + end_token = from_token.copy_and_replace("events_key", end_key) + + defer.returnValue((events, end_token)) + + def get_current_token_part(self): + return self.store.get_room_events_max_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, pagination_config, key): + from_token = pagination_config.from_token + to_token = pagination_config.to_token + limit = pagination_config.limit + direction = pagination_config.direction + + to_key = to_token.events_key if to_token else None + + events, next_key = yield self.store.paginate_room_events( + room_id=key, + from_key=from_token.events_key, + to_key=to_key, + direction=direction, + limit=limit, + with_feedback=True + ) + + next_token = from_token.copy_and_replace("events_key", next_key) + + defer.returnValue((events, next_token)) + + +class PresenceSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + def get_new_events_for_user(self, user, from_token, limit): + from_key = int(from_token.presence_key) + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial] + + if updates: + clock = self.clock + + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + end_token = from_token.copy_and_replace( + "presence_key", latest_serial + ) + return ((data, end_token)) + else: + end_token = from_token.copy_and_replace( + "presence_key", presence._user_cachemap_latest_serial + ) + return (([], end_token)) + + def get_current_token_part(self): + presence = self.hs.get_handlers().presence_handler + return presence._user_cachemap_latest_serial + + def get_pagination_rows(self, user, pagination_config, key): + # TODO (erikj): Does this make sense? Ordering? + + from_token = pagination_config.from_token + to_token = pagination_config.to_token + + from_key = int(from_token.presence_key) + + if to_token: + to_key = int(to_token.presence_key) + else: + to_key = -1 + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if to_key < cachemap[k].serial < from_key] + + if updates: + clock = self.clock + + earliest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + if to_token: + next_token = to_token + else: + next_token = from_token + + next_token = next_token.copy_and_replace( + "presence_key", earliest_serial + ) + return ((data, next_token)) + else: + if not to_token: + to_token = from_token.copy_and_replace( + "presence_key", 0 + ) + return (([], to_token)) + + +class EventSources(object): + SOURCE_TYPES = { + "room": RoomEventSource, + "presence": PresenceSource, + } + + def __init__(self, hs): + self.sources = { + name: cls(hs) + for name, cls in EventSources.SOURCE_TYPES.items() + } + + @staticmethod + def create_token(events_key, presence_key): + return StreamToken(events_key=events_key, presence_key=presence_key) + + @defer.inlineCallbacks + def get_current_token(self): + events_key = yield self.sources["room"].get_current_token_part() + presence_key = yield self.sources["presence"].get_current_token_part() + token = EventSources.create_token(events_key, presence_key) + defer.returnValue(token) + + +class StreamSource(object): + def get_new_events_for_user(self, user, from_token, limit): + raise NotImplementedError("get_new_events_for_user") + + def get_current_token_part(self): + raise NotImplementedError("get_current_token_part") + + def get_pagination_rows(self, user, pagination_config, key): + raise NotImplementedError("get_rows") diff --git a/synapse/types.py b/synapse/types.py index fd6a3d1d72..63154855dd 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -92,3 +92,36 @@ class RoomAlias(DomainSpecificString): class RoomID(DomainSpecificString): """Structure representing a room id. """ SIGIL = "!" + + +class StreamToken( + namedtuple( + "Token", + ("events_key", "presence_key") + ) +): + _SEPARATOR = "_" + + @classmethod + def from_string(cls, string): + try: + events_key, presence_key = string.split(cls._SEPARATOR) + + return cls( + events_key=events_key, + presence_key=presence_key, + ) + except: + raise SynapseError(400, "Invalid Token") + + def to_string(self): + return "".join([ + str(self.events_key), + self._SEPARATOR, + str(self.presence_key), + ]) + + def copy_and_replace(self, key, new_value): + d = self._asdict() + d[key] = new_value + return StreamToken(**d) |