summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/events.py106
-rw-r--r--synapse/handlers/presence.py39
-rw-r--r--synapse/handlers/room.py60
3 files changed, 68 insertions, 137 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1bd173acd8..e08231406d 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,19 +15,17 @@
 
 from twisted.internet import defer
 
+from synapse.api.events import SynapseEvent
+
 from ._base import BaseHandler
-from synapse.api.streams.event import (
-    EventStream, EventsStreamData
-)
-from synapse.handlers.presence import PresenceStreamData
 
+import logging
 
-class EventStreamHandler(BaseHandler):
 
-    stream_data_classes = [
-        EventsStreamData,
-        PresenceStreamData,
-    ]
+logger = logging.getLogger(__name__)
+
+
+class EventStreamHandler(BaseHandler):
 
     def __init__(self, hs):
         super(EventStreamHandler, self).__init__(hs)
@@ -43,45 +41,12 @@ class EventStreamHandler(BaseHandler):
 
         self.clock = hs.get_clock()
 
-    def get_event_stream_token(self, stream_type, store_id, start_token):
-        """Return the next token after this event.
-
-        Args:
-            stream_type (str): The StreamData.EVENT_TYPE
-            store_id (int): The new storage ID assigned from the data store.
-            start_token (str): The token the user started with.
-        Returns:
-            str: The end token.
-        """
-        for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
-            if stream_cls.EVENT_TYPE == stream_type:
-                # this is the stream for this event, so replace this part of
-                # the token
-                store_ids = start_token.split(EventStream.SEPARATOR)
-                store_ids[i] = str(store_id)
-                return EventStream.SEPARATOR.join(store_ids)
-        raise RuntimeError("Didn't find a stream type %s" % stream_type)
+        self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
     def get_stream(self, auth_user_id, pagin_config, timeout=0):
-        """Gets events as an event stream for this user.
-
-        This function looks for interesting *events* for this user. This is
-        different from the notifier, which looks for interested *users* who may
-        want to know about a single event.
-
-        Args:
-            auth_user_id (str): The user requesting their event stream.
-            pagin_config (synapse.api.streams.PaginationConfig): The config to
-            use when obtaining the stream.
-            timeout (int): The max time to wait for an incoming event in ms.
-        Returns:
-            A pagination stream API dict
-        """
         auth_user = self.hs.parse_userid(auth_user_id)
 
-        stream_id = object()
-
         try:
             if auth_user not in self._streams_per_user:
                 self._streams_per_user[auth_user] = 0
@@ -94,41 +59,30 @@ class EventStreamHandler(BaseHandler):
                     )
             self._streams_per_user[auth_user] += 1
 
-            # construct an event stream with the correct data ordering
-            stream_data_list = []
-            for stream_class in EventStreamHandler.stream_data_classes:
-                stream_data_list.append(stream_class(self.hs))
-            event_stream = EventStream(auth_user_id, stream_data_list)
-
-            # fix unknown tokens to known tokens
-            pagin_config = yield event_stream.fix_tokens(pagin_config)
-
-            # register interest in receiving new events
-            self.notifier.store_events_for(user_id=auth_user_id,
-                                           stream_id=stream_id,
-                                           from_tok=pagin_config.from_tok)
-
-            # see if we can grab a chunk now
-            data_chunk = yield event_stream.get_chunk(config=pagin_config)
-
-            # if there are previous events, return those. If not, wait on the
-            # new events for 'timeout' seconds.
-            if len(data_chunk["chunk"]) == 0 and timeout != 0:
-                results = yield defer.maybeDeferred(
-                    self.notifier.get_events_for,
-                    user_id=auth_user_id,
-                    stream_id=stream_id,
-                    timeout=timeout
-                )
-                if results:
-                    defer.returnValue(results)
+            if pagin_config.from_token is None:
+                pagin_config.from_token = None
 
-            defer.returnValue(data_chunk)
-        finally:
-            # cleanup
-            self.notifier.purge_events_for(user_id=auth_user_id,
-                                           stream_id=stream_id)
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
+            events, tokens = yield self.notifier.get_events_for(
+                auth_user, room_ids, pagin_config, timeout
+            )
+
+            chunks = [
+                e.get_dict() if isinstance(e, SynapseEvent) else e
+                for e in events
+            ]
+
+            chunk = {
+                "chunk": chunks,
+                "start": tokens[0].to_string(),
+                "end": tokens[1].to_string(),
+            }
+
+            defer.returnValue(chunk)
+
+        finally:
             self._streams_per_user[auth_user] -= 1
             if not self._streams_per_user[auth_user]:
                 del self._streams_per_user[auth_user]
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index be10162db5..c479908f61 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
-from synapse.api.streams import StreamData
 
 from ._base import BaseHandler
 
@@ -677,46 +676,10 @@ class PresenceHandler(BaseHandler):
         statuscache.make_event(user=observed_user, clock=self.clock)
 
         self.notifier.on_new_user_event(
-            observer_user.to_string(),
-            event_data=statuscache.make_event(
-                user=observed_user,
-                clock=self.clock
-            ),
-            stream_type=PresenceStreamData,
-            store_id=statuscache.serial
+            [observer_user],
         )
 
 
-class PresenceStreamData(StreamData):
-    def __init__(self, hs):
-        super(PresenceStreamData, self).__init__(hs)
-        self.presence = hs.get_handlers().presence_handler
-
-    def get_rows(self, user_id, from_key, to_key, limit, direction):
-        from_key = int(from_key)
-        to_key = int(to_key)
-
-        cachemap = self.presence._user_cachemap
-
-        # TODO(paul): limit, and filter by visibility
-        updates = [(k, cachemap[k]) for k in cachemap
-                   if from_key < cachemap[k].serial <= to_key]
-
-        if updates:
-            clock = self.presence.clock
-
-            latest_serial = max([x[1].serial for x in updates])
-            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
-            return ((data, latest_serial))
-        else:
-            return (([], self.presence._user_cachemap_latest_serial))
-
-    def max_token(self):
-        return self.presence._user_cachemap_latest_serial
-
-PresenceStreamData.EVENT_TYPE = PresenceStreamData
-
-
 class UserPresenceCache(object):
     """Store an observed user's state and status message.
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5a4569ac95..faea30b44e 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError
 from synapse.api.events.room import (
     RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
 )
-from synapse.api.streams.event import EventStream, EventsStreamData
-from synapse.handlers.presence import PresenceStreamData
+from synapse.streams.config import PaginationConfig
 from synapse.util import stringutils
 from ._base import BaseHandler
 
@@ -115,13 +114,24 @@ class MessageHandler(BaseHandler):
         """
         yield self.auth.check_joined_room(room_id, user_id)
 
-        data_source = [
-            EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
-        ]
-        event_stream = EventStream(user_id, data_source)
-        pagin_config = yield event_stream.fix_tokens(pagin_config)
-        data_chunk = yield event_stream.get_chunk(config=pagin_config)
-        defer.returnValue(data_chunk)
+        data_source = self.hs.get_event_sources().sources["room"]
+
+        if not pagin_config.from_token:
+            pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
+
+        user = self.hs.parse_userid(user_id)
+
+        events, next_token = yield data_source.get_pagination_rows(
+            user, pagin_config, room_id
+        )
+
+        chunk = {
+            "chunk": [e.get_dict() for e in events],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        defer.returnValue(chunk)
 
     @defer.inlineCallbacks
     def store_room_data(self, event=None, stamp_event=True):
@@ -256,20 +266,20 @@ class MessageHandler(BaseHandler):
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
 
+        user = self.hs.parse_userid(user_id)
+
         rooms_ret = []
 
-        now_rooms_token = yield self.store.get_room_events_max_id()
+        # FIXME (erikj): We need to not generate this token,
+        now_token = yield self.hs.get_event_sources().get_current_token()
 
         # FIXME (erikj): Fix this.
-        presence_stream = PresenceStreamData(self.hs)
-        now_presence_token = yield presence_stream.max_token()
-        presence = yield presence_stream.get_rows(
-            user_id, 0, now_presence_token, None, None
+        presence_stream = self.hs.get_event_sources().sources["presence"]
+        pagination_config = PaginationConfig(from_token=now_token)
+        presence, _ = yield presence_stream.get_pagination_rows(
+            user, pagination_config, None
         )
 
-        # FIXME (erikj): We need to not generate this token,
-        now_token = "%s_%s" % (now_rooms_token, now_presence_token)
-
         limit = pagin_config.limit
         if not limit:
             limit = 10
@@ -291,7 +301,7 @@ class MessageHandler(BaseHandler):
                 messages, token = yield self.store.get_recent_events_for_room(
                     event.room_id,
                     limit=limit,
-                    end_token=now_rooms_token,
+                    end_token=now_token.events_key,
                 )
 
                 d["messages"] = {
@@ -300,14 +310,18 @@ class MessageHandler(BaseHandler):
                     "end": token[1],
                 }
 
-                current_state = yield self.store.get_current_state(event.room_id)
+                current_state = yield self.store.get_current_state(
+                    event.room_id
+                )
                 d["state"] = [c.get_dict() for c in current_state]
             except:
                 logger.exception("Failed to get snapshot")
 
-        ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
-
-        # logger.debug("snapshot_all_rooms returning: %s", ret)
+        ret = {
+            "rooms": rooms_ret,
+            "presence": presence,
+            "end": now_token.to_string()
+        }
 
         defer.returnValue(ret)
 
@@ -490,7 +504,7 @@ class RoomMemberHandler(BaseHandler):
             for entry in member_list
         ]
         chunk_data = {
-            "start": "START",  # FIXME (erikj): START is no longer a valid value
+            "start": "START",  # FIXME (erikj): START is no longer valid
             "end": "END",
             "chunk": event_list
         }