From 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 18:57:46 +0100 Subject: WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model. --- synapse/handlers/events.py | 124 +++++++------------------------------------ synapse/handlers/presence.py | 32 ----------- synapse/handlers/room.py | 46 ++++++++-------- 3 files changed, 44 insertions(+), 158 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 6bb797caf2..2d7bd5083b 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -16,18 +16,14 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.api.streams.event import ( - EventStream, EventsStreamData -) -from synapse.handlers.presence import PresenceStreamData +import logging -class EventStreamHandler(BaseHandler): - stream_data_classes = [ - EventsStreamData, - PresenceStreamData, - ] +logger = logging.getLogger(__name__) + + +class EventStreamHandler(BaseHandler): def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) @@ -43,104 +39,22 @@ class EventStreamHandler(BaseHandler): self.clock = hs.get_clock() - def get_event_stream_token(self, stream_type, store_id, start_token): - """Return the next token after this event. - - Args: - stream_type (str): The StreamData.EVENT_TYPE - store_id (int): The new storage ID assigned from the data store. - start_token (str): The token the user started with. - Returns: - str: The end token. - """ - for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): - if stream_cls.EVENT_TYPE == stream_type: - # this is the stream for this event, so replace this part of - # the token - store_ids = start_token.split(EventStream.SEPARATOR) - store_ids[i] = str(store_id) - return EventStream.SEPARATOR.join(store_ids) - raise RuntimeError("Didn't find a stream type %s" % stream_type) + self.notifier = hs.get_notifier() @defer.inlineCallbacks def get_stream(self, auth_user_id, pagin_config, timeout=0): - """Gets events as an event stream for this user. - - This function looks for interesting *events* for this user. This is - different from the notifier, which looks for interested *users* who may - want to know about a single event. - - Args: - auth_user_id (str): The user requesting their event stream. - pagin_config (synapse.api.streams.PaginationConfig): The config to - use when obtaining the stream. - timeout (int): The max time to wait for an incoming event in ms. - Returns: - A pagination stream API dict - """ auth_user = self.hs.parse_userid(auth_user_id) - stream_id = object() - - try: - if auth_user not in self._streams_per_user: - self._streams_per_user[auth_user] = 0 - if auth_user in self._stop_timer_per_user: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user)) - else: - self.distributor.fire( - "started_user_eventstream", auth_user - ) - self._streams_per_user[auth_user] += 1 - - # construct an event stream with the correct data ordering - stream_data_list = [] - for stream_class in EventStreamHandler.stream_data_classes: - stream_data_list.append(stream_class(self.hs)) - event_stream = EventStream(auth_user_id, stream_data_list) - - # fix unknown tokens to known tokens - pagin_config = yield event_stream.fix_tokens(pagin_config) - - # register interest in receiving new events - self.notifier.store_events_for(user_id=auth_user_id, - stream_id=stream_id, - from_tok=pagin_config.from_tok) - - # see if we can grab a chunk now - data_chunk = yield event_stream.get_chunk(config=pagin_config) - - # if there are previous events, return those. If not, wait on the - # new events for 'timeout' seconds. - if len(data_chunk["chunk"]) == 0 and timeout != 0: - results = yield defer.maybeDeferred( - self.notifier.get_events_for, - user_id=auth_user_id, - stream_id=stream_id, - timeout=timeout - ) - if results: - defer.returnValue(results) - - defer.returnValue(data_chunk) - finally: - # cleanup - self.notifier.purge_events_for(user_id=auth_user_id, - stream_id=stream_id) - - self._streams_per_user[auth_user] -= 1 - if not self._streams_per_user[auth_user]: - del self._streams_per_user[auth_user] - - # 10 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - self.distributor.fire( - "stopped_user_eventstream", auth_user - ) - del self._stop_timer_per_user[auth_user] - - self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(5, _later) - ) + if pagin_config.from_token is None: + pagin_config.from_token = None + + events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": tokens[0].to_string(), + "end_token": tokens[1].to_string(), + } + + defer.returnValue(chunk) + diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index be10162db5..30d6269e2e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.api.streams import StreamData from ._base import BaseHandler @@ -682,41 +681,10 @@ class PresenceHandler(BaseHandler): user=observed_user, clock=self.clock ), - stream_type=PresenceStreamData, store_id=statuscache.serial ) -class PresenceStreamData(StreamData): - def __init__(self, hs): - super(PresenceStreamData, self).__init__(hs) - self.presence = hs.get_handlers().presence_handler - - def get_rows(self, user_id, from_key, to_key, limit, direction): - from_key = int(from_key) - to_key = int(to_key) - - cachemap = self.presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial <= to_key] - - if updates: - clock = self.presence.clock - - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) - else: - return (([], self.presence._user_cachemap_latest_serial)) - - def max_token(self): - return self.presence._user_cachemap_latest_serial - -PresenceStreamData.EVENT_TYPE = PresenceStreamData - - class UserPresenceCache(object): """Store an observed user's state and status message. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..20b4bbb665 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,6 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -115,13 +113,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources[0] + + if pagin_config.from_token: + from_token = pagin_config.from_token + else: + from_token = yield self.hs.get_event_sources().get_current_token() + + events, next_token = yield data_source.get_pagination_rows( + from_token, pagin_config.to_token, pagin_config.limit, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": from_token.to_string(), + "end_token": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -258,18 +267,15 @@ class MessageHandler(BaseHandler): rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + # FIXME (erikj): We need to not generate this token, + now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources[1] + presence = yield presence_stream.get_new_events_for_user( + user_id, now_token, None, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +297,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key.to_string(), ) d["messages"] = { @@ -305,9 +311,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()} defer.returnValue(ret) -- cgit 1.5.1 From 67c5f89244b9ff5f1deca199f35ef7240d0549cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 19:40:29 +0100 Subject: Enable presence again. Fix up api to match old api. --- synapse/handlers/events.py | 13 ++++++++++--- synapse/handlers/presence.py | 4 ++-- synapse/handlers/room.py | 4 ++-- synapse/notifier.py | 20 +++++++++++++++++++- synapse/streams/events.py | 4 ++-- 5 files changed, 35 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 2d7bd5083b..8c34776245 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.api.events import SynapseEvent + from ._base import BaseHandler import logging @@ -50,10 +52,15 @@ class EventStreamHandler(BaseHandler): events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + chunk = { - "chunk": [e.get_dict() for e in events], - "start_token": tokens[0].to_string(), - "end_token": tokens[1].to_string(), + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), } defer.returnValue(chunk) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 30d6269e2e..8408266da0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -142,7 +142,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): defer.returnValue(True) - return + #return # FIXME (erikj): This code path absolutely kills the database. assert(observed_user.is_mine) @@ -189,7 +189,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def set_state(self, target_user, auth_user, state): - return + # return # TODO (erikj): Turn this back on. Why did we end up sending EDUs # everywhere? diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 20b4bbb665..6fbe84ea40 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -126,8 +126,8 @@ class MessageHandler(BaseHandler): chunk = { "chunk": [e.get_dict() for e in events], - "start_token": from_token.to_string(), - "end_token": next_token.to_string(), + "start": from_token.to_string(), + "end": next_token.to_string(), } defer.returnValue(chunk) diff --git a/synapse/notifier.py b/synapse/notifier.py index 1911fd20ae..df9be29f3d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -91,7 +91,25 @@ class Notifier(object): ) def on_new_user_event(self, *args, **kwargs): - pass + source = self.event_sources.sources[1] + + listeners = self.signal_keys_to_users.get( + (source.SIGNAL_NAME, "moose"), + [] + ) + + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + key="moose", + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) def get_events_for(self, user, pagination_config, timeout): deferred = defer.Deferred() diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 887c792104..27c7734b36 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -97,7 +97,7 @@ class PresenceStreamSource(object): data = [x[1].make_event(user=x[0], clock=clock) for x in updates] end_token = from_token.copy_and_replace( - "presence_key", latest_serial + "presence_key", latest_serial + 1 ) return ((data, end_token)) else: @@ -107,7 +107,7 @@ class PresenceStreamSource(object): return (([], end_token)) def get_keys_for_user(self, user): - return defer.succeed([]) + return defer.succeed(["moose"]) def get_current_token_part(self): presence = self.hs.get_handlers().presence_handler -- cgit 1.5.1 From bd16b93e8f7143b4d2e98794a01aa62a060505d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 14:03:27 +0100 Subject: Implement presence event source. Change the way the notifier indexes listeners --- synapse/handlers/events.py | 7 +++- synapse/handlers/presence.py | 7 +--- synapse/handlers/room.py | 15 +++++--- synapse/notifier.py | 85 ++++++++++++++++++++++---------------------- synapse/streams/events.py | 73 ++++++++++++++++++++++++------------- 5 files changed, 107 insertions(+), 80 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 8c34776245..aabec37fc0 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -50,7 +50,12 @@ class EventStreamHandler(BaseHandler): if pagin_config.from_token is None: pagin_config.from_token = None - events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout) + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) chunks = [ e.get_dict() if isinstance(e, SynapseEvent) else e diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8408266da0..9a690258de 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -676,12 +676,7 @@ class PresenceHandler(BaseHandler): statuscache.make_event(user=observed_user, clock=self.clock) self.notifier.on_new_user_event( - observer_user.to_string(), - event_data=statuscache.make_event( - user=observed_user, - clock=self.clock - ), - store_id=statuscache.serial + [observer_user], ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6fbe84ea40..19ade10a91 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -120,8 +120,11 @@ class MessageHandler(BaseHandler): else: from_token = yield self.hs.get_event_sources().get_current_token() + user = self.hs.parse_userid(user_id) + events, next_token = yield data_source.get_pagination_rows( - from_token, pagin_config.to_token, pagin_config.limit, room_id + user, from_token, pagin_config.to_token, pagin_config.limit, + room_id ) chunk = { @@ -265,6 +268,8 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] # FIXME (erikj): We need to not generate this token, @@ -272,8 +277,8 @@ class MessageHandler(BaseHandler): # FIXME (erikj): Fix this. presence_stream = self.hs.get_event_sources().sources[1] - presence = yield presence_stream.get_new_events_for_user( - user_id, now_token, None, None + presence, _ = yield presence_stream.get_pagination_rows( + user, now_token, None, None, None ) limit = pagin_config.limit @@ -297,7 +302,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.events_key.to_string(), + end_token=now_token.events_key, ) d["messages"] = { @@ -311,7 +316,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()} + ret = {"rooms": rooms_ret, "presence": presence, "end": now_token.to_string()} defer.returnValue(ret) diff --git a/synapse/notifier.py b/synapse/notifier.py index df9be29f3d..a69d5343cb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -24,14 +24,14 @@ logger = logging.getLogger(__name__) class _NotificationListener(object): - def __init__(self, user, from_token, limit, timeout, deferred): + def __init__(self, user, rooms, from_token, limit, timeout, deferred): self.user = user self.from_token = from_token self.limit = limit self.timeout = timeout self.deferred = deferred - self.signal_key_list = [] + self.rooms = rooms self.pending_notifications = [] @@ -43,36 +43,39 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass - for signal, key in self.signal_key_list: - lst = notifier.signal_keys_to_users.get((signal, key), []) + for room in self.rooms: + lst = notifier.rooms_to_listeners.get(room, set()) + lst.discard(self) + + notifier.user_to_listeners.get(self.user, set()).discard(self) - try: - lst.remove(self) - except: - pass class Notifier(object): def __init__(self, hs): self.hs = hs - self.signal_keys_to_users = {} + self.rooms_to_listeners = {} + self.user_to_listeners = {} self.event_sources = hs.get_event_sources() + hs.get_distributor().observe( + "user_joined_room", self._user_joined_room + ) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): + def on_new_room_event(self, event, extra_users=[]): room_id = event.room_id source = self.event_sources.sources[0] - listeners = self.signal_keys_to_users.get( - (source.SIGNAL_NAME, room_id), - [] - ) + listeners = self.rooms_to_listeners.get(room_id, set()).copy() + + for user in extra_users: + listeners |= self.user_to_listeners.get(user, set()).copy() - logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners) logger.debug("on_new_room_event listeners %s", listeners) # TODO (erikj): Can we make this more efficient by hitting the @@ -82,7 +85,6 @@ class Notifier(object): listener.user, listener.from_token, listener.limit, - key=room_id, ) if events: @@ -90,20 +92,23 @@ class Notifier(object): self, events, listener.from_token, end_token ) - def on_new_user_event(self, *args, **kwargs): + @defer.inlineCallbacks + def on_new_user_event(self, users=[], rooms=[]): source = self.event_sources.sources[1] - listeners = self.signal_keys_to_users.get( - (source.SIGNAL_NAME, "moose"), - [] - ) + listeners = set() + + for user in users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + for room in rooms: + listeners |= self.rooms_to_listeners.get(room, set()).copy() for listener in listeners: events, end_token = yield source.get_new_events_for_user( listener.user, listener.from_token, listener.limit, - key="moose", ) if events: @@ -111,23 +116,24 @@ class Notifier(object): self, events, listener.from_token, end_token ) - def get_events_for(self, user, pagination_config, timeout): + def get_events_for(self, user, rooms, pagination_config, timeout): deferred = defer.Deferred() self._get_events( - deferred, user, pagination_config.from_token, + deferred, user, rooms, pagination_config.from_token, pagination_config.limit, timeout ).addErrback(deferred.errback) return deferred @defer.inlineCallbacks - def _get_events(self, deferred, user, from_token, limit, timeout): + def _get_events(self, deferred, user, rooms, from_token, limit, timeout): if not from_token: from_token = yield self.event_sources.get_current_token() listener = _NotificationListener( user, + rooms, from_token, limit, timeout, @@ -137,7 +143,7 @@ class Notifier(object): if timeout: reactor.callLater(timeout/1000, self._timeout_listener, listener) - yield self._register_with_keys(listener) + self._register_with_keys(listener) yield self._check_for_updates(listener) return @@ -152,25 +158,13 @@ class Notifier(object): listener.from_token, ) - @defer.inlineCallbacks @log_function def _register_with_keys(self, listener): - signals_keys = {} - - # TODO (erikj): This can probably be replaced by a DeferredList - for source in self.event_sources.sources: - keys = yield source.get_keys_for_user(listener.user) - signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys) + for room in listener.rooms: + s = self.rooms_to_listeners.setdefault(room, set()) + s.add(listener) - for signal, keys in signals_keys.items(): - for key in keys: - s = self.signal_keys_to_users.setdefault((signal, key), []) - s.append(listener) - listener.signal_key_list.append((signal, key)) - - logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users) - - defer.returnValue(listener) + self.user_to_listeners.setdefault(listener.user, set()).add(listener) @defer.inlineCallbacks @log_function @@ -195,8 +189,13 @@ class Notifier(object): end_token = from_token - if events: listener.notify(self, events, listener.from_token, end_token) defer.returnValue(listener) + + def _user_joined_room(self, user, room_id): + new_listeners = self.user_to_listeners.get(user, set()) + + listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners |= new_listeners diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 27c7734b36..36174a811b 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -26,16 +26,7 @@ class RoomEventSource(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_keys_for_user(self, user): - events = yield self.store.get_rooms_for_user_where_membership_is( - user.to_string(), - (Membership.JOIN,), - ) - - defer.returnValue(set([e.room_id for e in events])) - - @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_token, limit, key=None): + def get_new_events_for_user(self, user, from_token, limit): # We just ignore the key for now. to_key = yield self.get_current_token_part() @@ -56,7 +47,7 @@ class RoomEventSource(object): return self.store.get_room_events_max_id() @defer.inlineCallbacks - def get_pagination_rows(self, from_token, to_token, limit, key): + def get_pagination_rows(self, user, from_token, to_token, limit, key): to_key = to_token.events_key if to_token else None events, next_key = yield self.store.paginate_room_events( @@ -73,14 +64,14 @@ class RoomEventSource(object): defer.returnValue((events, next_token)) -class PresenceStreamSource(object): - SIGNAL_NAME = "PresenceStreamSource" +class PresenceSource(object): + SIGNAL_NAME = "PresenceSource" def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() - def get_new_events_for_user(self, user, from_token, limit, key=None): + def get_new_events_for_user(self, user, from_token, limit): from_key = int(from_token.presence_key) presence = self.hs.get_handlers().presence_handler @@ -97,7 +88,7 @@ class PresenceStreamSource(object): data = [x[1].make_event(user=x[0], clock=clock) for x in updates] end_token = from_token.copy_and_replace( - "presence_key", latest_serial + 1 + "presence_key", latest_serial ) return ((data, end_token)) else: @@ -106,18 +97,52 @@ class PresenceStreamSource(object): ) return (([], end_token)) - def get_keys_for_user(self, user): - return defer.succeed(["moose"]) - def get_current_token_part(self): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial + def get_pagination_rows(self, user, from_token, to_token, limit, key): + from_key = int(from_token.presence_key) + + if to_token: + to_key = int(to_token.presence_key) + else: + to_key = -1 + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if to_key < cachemap[k].serial < from_key] + + if updates: + clock = self.clock + + earliest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + if to_token: + next_token = to_token + else: + next_token = from_token + + next_token = next_token.copy_and_replace( + "presence_key", earliest_serial + ) + return ((data, next_token)) + else: + if not to_token: + to_token = from_token.copy_and_replace( + "presence_key", 0 + ) + return (([], to_token)) + class EventSources(object): SOURCE_TYPES = [ RoomEventSource, - PresenceStreamSource, + PresenceSource, ] def __init__(self, hs): @@ -130,15 +155,13 @@ class EventSources(object): @defer.inlineCallbacks def get_current_token(self): events_key = yield self.sources[0].get_current_token_part() - token = EventSources.create_token(events_key, "0") + presence_key = yield self.sources[1].get_current_token_part() + token = EventSources.create_token(events_key, presence_key) defer.returnValue(token) class StreamSource(object): - def get_keys_for_user(self, user): - raise NotImplementedError("get_keys_for_user") - - def get_new_events_for_user(self, user, from_token, limit, key=None): + def get_new_events_for_user(self, user, from_token, limit): raise NotImplementedError("get_new_events_for_user") def get_current_token_part(self): @@ -146,6 +169,6 @@ class StreamSource(object): class PaginationSource(object): - def get_pagination_rows(self, from_token, to_token, limit, key): + def get_pagination_rows(self, user, from_token, to_token, limit, key): raise NotImplementedError("get_rows") -- cgit 1.5.1 From 77a255c7c38a6dab80add45632ffc574099566c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 14:19:39 +0100 Subject: PEP8 tweaks. --- synapse/handlers/events.py | 1 - synapse/handlers/room.py | 12 +++++++++--- synapse/streams/config.py | 7 +++---- synapse/streams/events.py | 1 - synapse/types.py | 1 - 5 files changed, 12 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index b336b292d3..e08231406d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -59,7 +59,6 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - if pagin_config.from_token is None: pagin_config.from_token = None diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 19ade10a91..bf66d74548 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -311,12 +311,18 @@ class MessageHandler(BaseHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence, "end": now_token.to_string()} + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -499,7 +505,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } diff --git a/synapse/streams/config.py b/synapse/streams/config.py index b6ffbab1e7..69c7145a36 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -27,7 +27,9 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_token = StreamToken.from_string(from_tok) if from_tok else None + self.from_token = ( + StreamToken.from_string(from_tok) if from_tok else None + ) self.to_token = StreamToken.from_string(to_tok) if to_tok else None self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @@ -67,6 +69,3 @@ class PaginationConfig(object): "" ) % (self.from_tok, self.to_tok, self.direction, self.limit) - - - diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 36174a811b..bf48df5b79 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -171,4 +171,3 @@ class StreamSource(object): class PaginationSource(object): def get_pagination_rows(self, user, from_token, to_token, limit, key): raise NotImplementedError("get_rows") - diff --git a/synapse/types.py b/synapse/types.py index c8936b5758..63154855dd 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -121,7 +121,6 @@ class StreamToken( str(self.presence_key), ]) - def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value -- cgit 1.5.1 From 05672a6a8ca66bd2165217c06c5478a06b0cd952 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 15:25:21 +0100 Subject: Convert get_paginat_rows to use PaginationConfig. This allows people to supply directions. --- synapse/handlers/room.py | 15 +++++------ synapse/streams/config.py | 67 ++++++++++++++++++++++++++++------------------- synapse/streams/events.py | 20 +++++++++----- 3 files changed, 61 insertions(+), 41 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bf66d74548..a32c22db33 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,6 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseHandler @@ -115,21 +116,18 @@ class MessageHandler(BaseHandler): data_source = self.hs.get_event_sources().sources[0] - if pagin_config.from_token: - from_token = pagin_config.from_token - else: - from_token = yield self.hs.get_event_sources().get_current_token() + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() user = self.hs.parse_userid(user_id) events, next_token = yield data_source.get_pagination_rows( - user, from_token, pagin_config.to_token, pagin_config.limit, - room_id + user, pagin_config, room_id ) chunk = { "chunk": [e.get_dict() for e in events], - "start": from_token.to_string(), + "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), } @@ -277,8 +275,9 @@ class MessageHandler(BaseHandler): # FIXME (erikj): Fix this. presence_stream = self.hs.get_event_sources().sources[1] + pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( - user, now_token, None, None, None + user, pagination_config, None ) limit = pagin_config.limit diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 69c7145a36..2434844d80 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -26,40 +26,53 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" - def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_token = ( - StreamToken.from_string(from_tok) if from_tok else None - ) - self.to_token = StreamToken.from_string(to_tok) if to_tok else None + def __init__(self, from_token=None, to_token=None, direction='f', + limit=0): + self.from_token = from_token + self.to_token = to_token self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) @classmethod def from_request(cls, request, raise_invalid_params=True): - params = { - "direction": 'f', - } - - query_param_mappings = [ # 3-tuple of qp_key, attribute, rules - ("from", "from_tok", lambda x: type(x) == str), - ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()), - ("dir", "direction", lambda x: x == 'f' or x == 'b'), - ] - - for qp, attr, is_valid in query_param_mappings: - if qp in request.args: - if is_valid(request.args[qp][0]): - params[attr] = request.args[qp][0] - elif raise_invalid_params: - raise SynapseError(400, "%s parameter is invalid." % qp) - - if "from_tok" in params and params["from_tok"] == "END": - # TODO (erikj): This is for compatibility only. - del params["from_tok"] + def get_param(name, default=None): + lst = request.args.get(name, []) + if len(lst) > 1: + raise SynapseError( + 400, "%s must be specified only once" % (name,) + ) + elif len(lst) == 1: + return lst[0] + else: + return default + + direction = get_param("dir", 'f') + if direction not in ['f', 'b']: + raise SynapseError(400, "'dir' parameter is invalid.") + + from_tok = get_param("from") + to_tok = get_param("to") + + try: + if from_tok == "END": + from_tok = None # For backwards compat. + elif from_tok: + from_tok = StreamToken.from_string(from_tok) + except: + raise SynapseError(400, "'from' paramater is invalid") + + try: + if to_tok: + to_tok = StreamToken.from_string(to_tok) + except: + raise SynapseError(400, "'to' paramater is invalid") + + limit = get_param("limit", "0") + if not limit.isdigit(): + raise SynapseError(400, "'limit' parameter must be an integer.") try: - return PaginationConfig(**params) + return PaginationConfig(from_tok, to_tok, direction, limit) except: logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") diff --git a/synapse/streams/events.py b/synapse/streams/events.py index bf48df5b79..8a84a9d392 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -47,14 +47,19 @@ class RoomEventSource(object): return self.store.get_room_events_max_id() @defer.inlineCallbacks - def get_pagination_rows(self, user, from_token, to_token, limit, key): + def get_pagination_rows(self, user, pagination_config, key): + from_token = pagination_config.from_token + to_token = pagination_config.to_token + limit = pagination_config.limit + direction = pagination_config.direction + to_key = to_token.events_key if to_token else None events, next_key = yield self.store.paginate_room_events( room_id=key, from_key=from_token.events_key, to_key=to_key, - direction='b', + direction=direction, limit=limit, with_feedback=True ) @@ -101,7 +106,12 @@ class PresenceSource(object): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial - def get_pagination_rows(self, user, from_token, to_token, limit, key): + def get_pagination_rows(self, user, pagination_config, key): + # TODO (erikj): Does this make sense? Ordering? + + from_token = pagination_config.from_token + to_token = pagination_config.to_token + from_key = int(from_token.presence_key) if to_token: @@ -167,7 +177,5 @@ class StreamSource(object): def get_current_token_part(self): raise NotImplementedError("get_current_token_part") - -class PaginationSource(object): - def get_pagination_rows(self, user, from_token, to_token, limit, key): + def get_pagination_rows(self, user, pagination_config, key): raise NotImplementedError("get_rows") -- cgit 1.5.1 From bfe9faad5abf429b7023aaaeb3ba3200a75bf485 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 15:33:52 +0100 Subject: Index sources in a nicer fashion. --- synapse/handlers/room.py | 4 ++-- synapse/notifier.py | 6 +++--- synapse/streams/events.py | 21 ++++++++++----------- 3 files changed, 15 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a32c22db33..faea30b44e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -114,7 +114,7 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = self.hs.get_event_sources().sources[0] + data_source = self.hs.get_event_sources().sources["room"] if not pagin_config.from_token: pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() @@ -274,7 +274,7 @@ class MessageHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = self.hs.get_event_sources().sources[1] + presence_stream = self.hs.get_event_sources().sources["presence"] pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( user, pagination_config, None diff --git a/synapse/notifier.py b/synapse/notifier.py index a69d5343cb..b969011b32 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -69,7 +69,7 @@ class Notifier(object): def on_new_room_event(self, event, extra_users=[]): room_id = event.room_id - source = self.event_sources.sources[0] + source = self.event_sources.sources["room"] listeners = self.rooms_to_listeners.get(room_id, set()).copy() @@ -94,7 +94,7 @@ class Notifier(object): @defer.inlineCallbacks def on_new_user_event(self, users=[], rooms=[]): - source = self.event_sources.sources[1] + source = self.event_sources.sources["presence"] listeners = set() @@ -176,7 +176,7 @@ class Notifier(object): limit = listener.limit # TODO (erikj): DeferredList? - for source in self.event_sources.sources: + for source in self.event_sources.sources.values(): stuff, new_token = yield source.get_new_events_for_user( listener.user, from_token, diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 8a84a9d392..2e6ea6ca26 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -20,8 +20,6 @@ from synapse.types import StreamToken class RoomEventSource(object): - SIGNAL_NAME = "RoomEventSource" - def __init__(self, hs): self.store = hs.get_datastore() @@ -70,8 +68,6 @@ class RoomEventSource(object): class PresenceSource(object): - SIGNAL_NAME = "PresenceSource" - def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() @@ -150,13 +146,16 @@ class PresenceSource(object): class EventSources(object): - SOURCE_TYPES = [ - RoomEventSource, - PresenceSource, - ] + SOURCE_TYPES = { + "room": RoomEventSource, + "presence": PresenceSource, + } def __init__(self, hs): - self.sources = [t(hs) for t in EventSources.SOURCE_TYPES] + self.sources = { + name: cls(hs) + for name, cls in EventSources.SOURCE_TYPES.items() + } @staticmethod def create_token(events_key, presence_key): @@ -164,8 +163,8 @@ class EventSources(object): @defer.inlineCallbacks def get_current_token(self): - events_key = yield self.sources[0].get_current_token_part() - presence_key = yield self.sources[1].get_current_token_part() + events_key = yield self.sources["room"].get_current_token_part() + presence_key = yield self.sources["presence"].get_current_token_part() token = EventSources.create_token(events_key, presence_key) defer.returnValue(token) -- cgit 1.5.1 From 7917ff1271b12aa2e7b11f714d63e8288d4fe733 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 16:09:48 +0100 Subject: Turn off presence again. --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9a690258de..c479908f61 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -142,7 +142,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): defer.returnValue(True) - #return + return # FIXME (erikj): This code path absolutely kills the database. assert(observed_user.is_mine) @@ -189,7 +189,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def set_state(self, target_user, auth_user, state): - # return + return # TODO (erikj): Turn this back on. Why did we end up sending EDUs # everywhere? -- cgit 1.5.1 From 8af5e360d62601484214ed533ee35936229145bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 16:23:33 +0100 Subject: Remove store_id from notifier.on_new_room_event calls. --- synapse/handlers/federation.py | 2 +- synapse/handlers/room.py | 19 +++++++------------ 2 files changed, 8 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..62edd5dbdc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -110,7 +110,7 @@ class FederationHandler(BaseHandler): ) if not backfilled: - yield self.notifier.on_new_room_event(event, store_id) + yield self.notifier.on_new_room_event(event) if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index faea30b44e..bac9c073fa 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -94,7 +94,7 @@ class MessageHandler(BaseHandler): event.room_id ) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) yield self.hs.get_federation().handle_new_event(event) @@ -158,7 +158,7 @@ class MessageHandler(BaseHandler): event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id ) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) yield self.hs.get_federation().handle_new_event(event) @@ -240,7 +240,7 @@ class MessageHandler(BaseHandler): ) yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, @@ -405,10 +405,8 @@ class RoomCreationHandler(BaseHandler): ) yield self.state_handler.handle_new_event(config_event) - # store_id = persist... yield self.hs.get_federation().handle_new_event(config_event) - # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -726,23 +724,20 @@ class RoomMemberHandler(BaseHandler): # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) if membership == Membership.INVITE: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = target_user.domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = target_user.domain destinations.append(host) event.destinations = list(set(destinations)) yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event, extra_users=[target_user]) class RoomListHandler(BaseHandler): -- cgit 1.5.1 From 52cb5e63247cd44c0d1fbbdb09e436dc3ceb57a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Aug 2014 16:44:29 +0100 Subject: Remove stale FIXMEs --- synapse/handlers/room.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bac9c073fa..760373344d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -270,10 +270,8 @@ class MessageHandler(BaseHandler): rooms_ret = [] - # FIXME (erikj): We need to not generate this token, now_token = yield self.hs.get_event_sources().get_current_token() - # FIXME (erikj): Fix this. presence_stream = self.hs.get_event_sources().sources["presence"] pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( -- cgit 1.5.1