diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/_base.py | 5 | ||||
-rw-r--r-- | synapse/handlers/events.py | 106 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 39 | ||||
-rw-r--r-- | synapse/handlers/room.py | 69 |
5 files changed, 76 insertions, 145 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 78df9ac53e..85a0f5dff5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -31,7 +31,8 @@ class BaseHandler(object): class BaseRoomHandler(BaseHandler): @defer.inlineCallbacks - def _on_new_room_event(self, event, snapshot, extra_destinations=[]): + def _on_new_room_event(self, event, snapshot, extra_destinations=[], + extra_users=[]): snapshot.fill_out_prev_events(event) store_id = yield self.store.persist_event(event) @@ -43,7 +44,7 @@ class BaseRoomHandler(BaseHandler): ))) event.destinations = list(destinations) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event, extra_users=[]) federation_handler = self.hs.get_handlers().federation_handler yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1bd173acd8..e08231406d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,19 +15,17 @@ from twisted.internet import defer +from synapse.api.events import SynapseEvent + from ._base import BaseHandler -from synapse.api.streams.event import ( - EventStream, EventsStreamData -) -from synapse.handlers.presence import PresenceStreamData +import logging -class EventStreamHandler(BaseHandler): - stream_data_classes = [ - EventsStreamData, - PresenceStreamData, - ] +logger = logging.getLogger(__name__) + + +class EventStreamHandler(BaseHandler): def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) @@ -43,45 +41,12 @@ class EventStreamHandler(BaseHandler): self.clock = hs.get_clock() - def get_event_stream_token(self, stream_type, store_id, start_token): - """Return the next token after this event. - - Args: - stream_type (str): The StreamData.EVENT_TYPE - store_id (int): The new storage ID assigned from the data store. - start_token (str): The token the user started with. - Returns: - str: The end token. - """ - for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): - if stream_cls.EVENT_TYPE == stream_type: - # this is the stream for this event, so replace this part of - # the token - store_ids = start_token.split(EventStream.SEPARATOR) - store_ids[i] = str(store_id) - return EventStream.SEPARATOR.join(store_ids) - raise RuntimeError("Didn't find a stream type %s" % stream_type) + self.notifier = hs.get_notifier() @defer.inlineCallbacks def get_stream(self, auth_user_id, pagin_config, timeout=0): - """Gets events as an event stream for this user. - - This function looks for interesting *events* for this user. This is - different from the notifier, which looks for interested *users* who may - want to know about a single event. - - Args: - auth_user_id (str): The user requesting their event stream. - pagin_config (synapse.api.streams.PaginationConfig): The config to - use when obtaining the stream. - timeout (int): The max time to wait for an incoming event in ms. - Returns: - A pagination stream API dict - """ auth_user = self.hs.parse_userid(auth_user_id) - stream_id = object() - try: if auth_user not in self._streams_per_user: self._streams_per_user[auth_user] = 0 @@ -94,41 +59,30 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - # construct an event stream with the correct data ordering - stream_data_list = [] - for stream_class in EventStreamHandler.stream_data_classes: - stream_data_list.append(stream_class(self.hs)) - event_stream = EventStream(auth_user_id, stream_data_list) - - # fix unknown tokens to known tokens - pagin_config = yield event_stream.fix_tokens(pagin_config) - - # register interest in receiving new events - self.notifier.store_events_for(user_id=auth_user_id, - stream_id=stream_id, - from_tok=pagin_config.from_tok) - - # see if we can grab a chunk now - data_chunk = yield event_stream.get_chunk(config=pagin_config) - - # if there are previous events, return those. If not, wait on the - # new events for 'timeout' seconds. - if len(data_chunk["chunk"]) == 0 and timeout != 0: - results = yield defer.maybeDeferred( - self.notifier.get_events_for, - user_id=auth_user_id, - stream_id=stream_id, - timeout=timeout - ) - if results: - defer.returnValue(results) + if pagin_config.from_token is None: + pagin_config.from_token = None - defer.returnValue(data_chunk) - finally: - # cleanup - self.notifier.purge_events_for(user_id=auth_user_id, - stream_id=stream_id) + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) + + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + + chunk = { + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), + } + + defer.returnValue(chunk) + + finally: self._streams_per_user[auth_user] -= 1 if not self._streams_per_user[auth_user]: del self._streams_per_user[auth_user] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7253f56322..9023c3d403 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -176,7 +176,7 @@ class FederationHandler(BaseHandler): ) if not backfilled: - yield self.notifier.on_new_room_event(event, store_id) + yield self.notifier.on_new_room_event(event) if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index be10162db5..c479908f61 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.api.streams import StreamData from ._base import BaseHandler @@ -677,46 +676,10 @@ class PresenceHandler(BaseHandler): statuscache.make_event(user=observed_user, clock=self.clock) self.notifier.on_new_user_event( - observer_user.to_string(), - event_data=statuscache.make_event( - user=observed_user, - clock=self.clock - ), - stream_type=PresenceStreamData, - store_id=statuscache.serial + [observer_user], ) -class PresenceStreamData(StreamData): - def __init__(self, hs): - super(PresenceStreamData, self).__init__(hs) - self.presence = hs.get_handlers().presence_handler - - def get_rows(self, user_id, from_key, to_key, limit, direction): - from_key = int(from_key) - to_key = int(to_key) - - cachemap = self.presence._user_cachemap - - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial <= to_key] - - if updates: - clock = self.presence.clock - - latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) - else: - return (([], self.presence._user_cachemap_latest_serial)) - - def max_token(self): - return self.presence._user_cachemap_latest_serial - -PresenceStreamData.EVENT_TYPE = PresenceStreamData - - class UserPresenceCache(object): """Store an observed user's state and status message. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7b4b051888..f01349b339 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseRoomHandler @@ -107,13 +106,24 @@ class MessageHandler(BaseRoomHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources["room"] + + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() + + user = self.hs.parse_userid(user_id) + + events, next_token = yield data_source.get_pagination_rows( + user, pagin_config, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -235,20 +245,18 @@ class MessageHandler(BaseRoomHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + now_token = yield self.hs.get_event_sources().get_current_token() - # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources["presence"] + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -270,7 +278,7 @@ class MessageHandler(BaseRoomHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key, ) d["messages"] = { @@ -279,14 +287,18 @@ class MessageHandler(BaseRoomHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -381,7 +393,6 @@ class RoomCreationHandler(BaseRoomHandler): federation_handler = self.hs.get_handlers().federation_handler yield federation_handler.handle_new_event(config_event, snapshot) - # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -477,7 +488,7 @@ class RoomMemberHandler(BaseRoomHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } @@ -701,17 +712,19 @@ class RoomMemberHandler(BaseRoomHandler): # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) if membership == Membership.INVITE: - host = UserID.from_string(target_user_id, self.hs).domain + host = target_user.domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string(target_user_id, self.hs).domain + host = target_user.domain destinations.append(host) return self._on_new_room_event( - event, snapshot, extra_destinations=destinations + event, snapshot, extra_destinations=destinations, + extra_users=[target_user] ) class RoomListHandler(BaseRoomHandler): |