summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/presence.py25
-rw-r--r--synapse/handlers/room.py21
-rw-r--r--synapse/handlers/typing.py2
-rw-r--r--synapse/streams/config.py23
-rw-r--r--synapse/streams/events.py2
6 files changed, 43 insertions, 40 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b2b8549ed..72894869ea 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -115,8 +115,12 @@ class MessageHandler(BaseHandler):
 
         user = self.hs.parse_userid(user_id)
 
-        events, next_token = yield data_source.get_pagination_rows(
-            user, pagin_config, room_id
+        events, next_key = yield data_source.get_pagination_rows(
+            user, pagin_config.get_source_config("room"), room_id
+        )
+
+        next_token = pagin_config.from_token.copy_and_replace(
+            "room_key", next_key
         )
 
         chunk = {
@@ -271,7 +275,7 @@ class MessageHandler(BaseHandler):
         presence_stream = self.hs.get_event_sources().sources["presence"]
         pagination_config = PaginationConfig(from_token=now_token)
         presence, _ = yield presence_stream.get_pagination_rows(
-            user, pagination_config, None
+            user, pagination_config.get_source_config("presence"), None
         )
 
         public_rooms = yield self.store.get_rooms(is_public=True)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 4176367643..13b5b4da93 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -823,15 +823,12 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         # TODO (erikj): Does this make sense? Ordering?
 
-        from_token = pagination_config.from_token
-        to_token = pagination_config.to_token
-
         observer_user = user
 
-        from_key = int(from_token.presence_key)
+        from_key = int(pagination_config.from_key)
 
-        if to_token:
-            to_key = int(to_token.presence_key)
+        if pagination_config.to_key:
+            to_key = int(pagination_config.to_key)
         else:
             to_key = -1
 
@@ -855,21 +852,9 @@ class PresenceEventSource(object):
             earliest_serial = max([x[1].serial for x in updates])
             data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
 
-            if to_token:
-                next_token = to_token
-            else:
-                next_token = from_token
-
-            next_token = next_token.copy_and_replace(
-                "presence_key", earliest_serial
-            )
-            defer.returnValue((data, next_token))
+            defer.returnValue((data, earliest_serial))
         else:
-            if not to_token:
-                to_token = from_token.copy_and_replace(
-                    "presence_key", 0
-                )
-            defer.returnValue(([], to_token))
+            defer.returnValue(([], 0))
 
 
 class UserPresenceCache(object):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 21ae03df0d..81ce1a5907 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -612,23 +612,14 @@ class RoomEventSource(object):
         return self.store.get_room_events_max_id()
 
     @defer.inlineCallbacks
-    def get_pagination_rows(self, user, pagination_config, key):
-        from_token = pagination_config.from_token
-        to_token = pagination_config.to_token
-        limit = pagination_config.limit
-        direction = pagination_config.direction
-
-        to_key = to_token.room_key if to_token else None
-
+    def get_pagination_rows(self, user, config, key):
         events, next_key = yield self.store.paginate_room_events(
             room_id=key,
-            from_key=from_token.room_key,
-            to_key=to_key,
-            direction=direction,
-            limit=limit,
+            from_key=config.from_key,
+            to_key=config.to_key,
+            direction=config.direction,
+            limit=config.limit,
             with_feedback=True
         )
 
-        next_token = from_token.copy_and_replace("room_key", next_key)
-
-        defer.returnValue((events, next_token))
+        defer.returnValue((events, next_key))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0ca4e5c31e..6edfbfa1c4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -158,4 +158,4 @@ class TypingNotificationEventSource(object):
         return 0
 
     def get_pagination_rows(self, user, pagination_config, key):
-        return ([], pagination_config.from_token)
+        return ([], pagination_config.from_key)
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 6483ce2e25..527507e5cd 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -22,6 +22,19 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+class SourcePaginationConfig(object):
+
+    """A configuration object which stores pagination parameters for a
+    specific event source."""
+
+    def __init__(self, from_key=None, to_key=None, direction='f',
+                 limit=0):
+        self.from_key = from_key
+        self.to_key = to_key
+        self.direction = 'f' if direction == 'f' else 'b'
+        self.limit = int(limit)
+
+
 class PaginationConfig(object):
 
     """A configuration object which stores pagination parameters."""
@@ -82,3 +95,13 @@ class PaginationConfig(object):
             "<PaginationConfig from_tok=%s, to_tok=%s, "
             "direction=%s, limit=%s>"
         ) % (self.from_token, self.to_token, self.direction, self.limit)
+
+    def get_source_config(self, source_name):
+        keyname = "%s_key" % source_name
+
+        return SourcePaginationConfig(
+            from_key=getattr(self.from_token, keyname),
+            to_key=getattr(self.to_token, keyname) if self.to_token else None,
+            direction=self.direction,
+            limit=self.limit,
+        )
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 41715436b0..fb698d2d71 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -35,7 +35,7 @@ class NullSource(object):
         return defer.succeed(0)
 
     def get_pagination_rows(self, user, pagination_config, key):
-        return defer.succeed(([], pagination_config.from_token))
+        return defer.succeed(([], pagination_config.from_key))
 
 
 class EventSources(object):