summary refs log tree commit diff
diff options
context:
space:
mode:
authorPaul "LeoNerd" Evans <paul@matrix.org>2014-08-29 19:13:55 +0100
committerPaul "LeoNerd" Evans <paul@matrix.org>2014-08-29 19:13:55 +0100
commiteec67a675f7ea3545bfba79c6b753f63f7fd9b3b (patch)
tree3f21a7efafb131bd76848cdc50ce574edb95fd2f
parentBugfix for rest presence test - datastore needs to implement profile methods (diff)
downloadsynapse-eec67a675f7ea3545bfba79c6b753f63f7fd9b3b.tar.xz
Have EventSource's get_new_events_for_user() API work only on keys within that source, not overall eventstream tokens
-rw-r--r--synapse/handlers/presence.py14
-rw-r--r--synapse/handlers/room.py8
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/notifier.py30
-rw-r--r--synapse/streams/events.py7
5 files changed, 33 insertions, 30 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1d3b02a9db..05bf145240 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -727,8 +727,8 @@ class PresenceEventSource(object):
         self.hs = hs
         self.clock = hs.get_clock()
 
-    def get_new_events_for_user(self, user, from_token, limit):
-        from_key = int(from_token.presence_key)
+    def get_new_events_for_user(self, user, from_key, limit):
+        from_key = int(from_key)
 
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
@@ -743,15 +743,9 @@ class PresenceEventSource(object):
             latest_serial = max([x[1].serial for x in updates])
             data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
 
-            end_token = from_token.copy_and_replace(
-                "presence_key", latest_serial
-            )
-            return ((data, end_token))
+            return ((data, latest_serial))
         else:
-            end_token = from_token.copy_and_replace(
-                "presence_key", presence._user_cachemap_latest_serial
-            )
-            return (([], end_token))
+            return (([], presence._user_cachemap_latest_serial))
 
     def get_current_token_part(self):
         presence = self.hs.get_handlers().presence_handler
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b27bdecd43..ce15420bf4 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -469,22 +469,20 @@ class RoomEventSource(object):
         self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
-    def get_new_events_for_user(self, user, from_token, limit):
+    def get_new_events_for_user(self, user, from_key, limit):
         # We just ignore the key for now.
 
         to_key = yield self.get_current_token_part()
 
         events, end_key = yield self.store.get_room_events_stream(
             user_id=user.to_string(),
-            from_key=from_token.room_key,
+            from_key=from_key,
             to_key=to_key,
             room_id=None,
             limit=limit,
         )
 
-        end_token = from_token.copy_and_replace("room_key", end_key)
-
-        defer.returnValue((events, end_token))
+        defer.returnValue((events, end_key))
 
     def get_current_token_part(self):
         return self.store.get_room_events_max_id()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ecb9318d1c..238b063483 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -151,8 +151,8 @@ class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
 
-    def get_new_events_for_user(self, user, from_token, limit):
-        return ([], from_token)
+    def get_new_events_for_user(self, user, from_key, limit):
+        return ([], from_key)
 
     def get_current_token_part(self):
         return 0
diff --git a/synapse/notifier.py b/synapse/notifier.py
index b6d5ec4820..cb544e9886 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -95,7 +95,7 @@ class Notifier(object):
         """
         room_id = event.room_id
 
-        source = self.event_sources.sources["room"]
+        room_source = self.event_sources.sources["room"]
 
         listeners = self.rooms_to_listeners.get(room_id, set()).copy()
 
@@ -107,13 +107,17 @@ class Notifier(object):
         # TODO (erikj): Can we make this more efficient by hitting the
         # db once?
         for listener in listeners:
-            events, end_token = yield source.get_new_events_for_user(
+            events, end_key = yield room_source.get_new_events_for_user(
                 listener.user,
-                listener.from_token,
+                listener.from_token.room_key,
                 listener.limit,
             )
 
             if events:
+                end_token = listener.from_token.copy_and_replace(
+                    "room_key", end_key
+                )
+
                 listener.notify(
                     self, events, listener.from_token, end_token
                 )
@@ -126,7 +130,7 @@ class Notifier(object):
 
         Will wake up all listeners for the given users and rooms.
         """
-        source = self.event_sources.sources["presence"]
+        presence_source = self.event_sources.sources["presence"]
 
         listeners = set()
 
@@ -137,13 +141,17 @@ class Notifier(object):
             listeners |= self.rooms_to_listeners.get(room, set()).copy()
 
         for listener in listeners:
-            events, end_token = yield source.get_new_events_for_user(
+            events, end_key = yield presence_source.get_new_events_for_user(
                 listener.user,
-                listener.from_token,
+                listener.from_token.presence_key,
                 listener.limit,
             )
 
             if events:
+                end_token = listener.from_token.copy_and_replace(
+                    "presence_key", end_key
+                )
+
                 listener.notify(
                     self, events, listener.from_token, end_token
                 )
@@ -216,16 +224,18 @@ class Notifier(object):
         limit = listener.limit
 
         # TODO (erikj): DeferredList?
-        for source in self.event_sources.sources.values():
-            stuff, new_token = yield source.get_new_events_for_user(
+        for name, source in self.event_sources.sources.items():
+            keyname = "%s_key" % name
+
+            stuff, new_key = yield source.get_new_events_for_user(
                 listener.user,
-                from_token,
+                getattr(from_token, keyname),
                 limit,
             )
 
             events.extend(stuff)
 
-            from_token = new_token
+            from_token = from_token.copy_and_replace(keyname, new_key)
 
         end_token = from_token
 
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 8480368673..43b6b1eba3 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -28,8 +28,8 @@ class NullSource(object):
     def __init__(self, hs):
         pass
 
-    def get_new_events_for_user(self, user, from_token, limit):
-        return defer.succeed(([], from_token))
+    def get_new_events_for_user(self, user, from_key, limit):
+        return defer.succeed(([], from_key))
 
     def get_current_token_part(self):
         return defer.succeed(0)
@@ -68,7 +68,8 @@ class EventSources(object):
 
 
 class StreamSource(object):
-    def get_new_events_for_user(self, user, from_token, limit):
+    def get_new_events_for_user(self, user, from_key, limit):
+        """from_key is the key within this event source."""
         raise NotImplementedError("get_new_events_for_user")
 
     def get_current_token_part(self):