diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1d3b02a9db..05bf145240 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -727,8 +727,8 @@ class PresenceEventSource(object):
self.hs = hs
self.clock = hs.get_clock()
- def get_new_events_for_user(self, user, from_token, limit):
- from_key = int(from_token.presence_key)
+ def get_new_events_for_user(self, user, from_key, limit):
+ from_key = int(from_key)
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
@@ -743,15 +743,9 @@ class PresenceEventSource(object):
latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
- end_token = from_token.copy_and_replace(
- "presence_key", latest_serial
- )
- return ((data, end_token))
+ return ((data, latest_serial))
else:
- end_token = from_token.copy_and_replace(
- "presence_key", presence._user_cachemap_latest_serial
- )
- return (([], end_token))
+ return (([], presence._user_cachemap_latest_serial))
def get_current_token_part(self):
presence = self.hs.get_handlers().presence_handler
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b27bdecd43..ce15420bf4 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -469,22 +469,20 @@ class RoomEventSource(object):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_new_events_for_user(self, user, from_token, limit):
+ def get_new_events_for_user(self, user, from_key, limit):
# We just ignore the key for now.
to_key = yield self.get_current_token_part()
events, end_key = yield self.store.get_room_events_stream(
user_id=user.to_string(),
- from_key=from_token.room_key,
+ from_key=from_key,
to_key=to_key,
room_id=None,
limit=limit,
)
- end_token = from_token.copy_and_replace("room_key", end_key)
-
- defer.returnValue((events, end_token))
+ defer.returnValue((events, end_key))
def get_current_token_part(self):
return self.store.get_room_events_max_id()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ecb9318d1c..238b063483 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -151,8 +151,8 @@ class TypingNotificationEventSource(object):
def __init__(self, hs):
self.hs = hs
- def get_new_events_for_user(self, user, from_token, limit):
- return ([], from_token)
+ def get_new_events_for_user(self, user, from_key, limit):
+ return ([], from_key)
def get_current_token_part(self):
return 0
diff --git a/synapse/notifier.py b/synapse/notifier.py
index b6d5ec4820..cb544e9886 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -95,7 +95,7 @@ class Notifier(object):
"""
room_id = event.room_id
- source = self.event_sources.sources["room"]
+ room_source = self.event_sources.sources["room"]
listeners = self.rooms_to_listeners.get(room_id, set()).copy()
@@ -107,13 +107,17 @@ class Notifier(object):
# TODO (erikj): Can we make this more efficient by hitting the
# db once?
for listener in listeners:
- events, end_token = yield source.get_new_events_for_user(
+ events, end_key = yield room_source.get_new_events_for_user(
listener.user,
- listener.from_token,
+ listener.from_token.room_key,
listener.limit,
)
if events:
+ end_token = listener.from_token.copy_and_replace(
+ "room_key", end_key
+ )
+
listener.notify(
self, events, listener.from_token, end_token
)
@@ -126,7 +130,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
- source = self.event_sources.sources["presence"]
+ presence_source = self.event_sources.sources["presence"]
listeners = set()
@@ -137,13 +141,17 @@ class Notifier(object):
listeners |= self.rooms_to_listeners.get(room, set()).copy()
for listener in listeners:
- events, end_token = yield source.get_new_events_for_user(
+ events, end_key = yield presence_source.get_new_events_for_user(
listener.user,
- listener.from_token,
+ listener.from_token.presence_key,
listener.limit,
)
if events:
+ end_token = listener.from_token.copy_and_replace(
+ "presence_key", end_key
+ )
+
listener.notify(
self, events, listener.from_token, end_token
)
@@ -216,16 +224,18 @@ class Notifier(object):
limit = listener.limit
# TODO (erikj): DeferredList?
- for source in self.event_sources.sources.values():
- stuff, new_token = yield source.get_new_events_for_user(
+ for name, source in self.event_sources.sources.items():
+ keyname = "%s_key" % name
+
+ stuff, new_key = yield source.get_new_events_for_user(
listener.user,
- from_token,
+ getattr(from_token, keyname),
limit,
)
events.extend(stuff)
- from_token = new_token
+ from_token = from_token.copy_and_replace(keyname, new_key)
end_token = from_token
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 8480368673..43b6b1eba3 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -28,8 +28,8 @@ class NullSource(object):
def __init__(self, hs):
pass
- def get_new_events_for_user(self, user, from_token, limit):
- return defer.succeed(([], from_token))
+ def get_new_events_for_user(self, user, from_key, limit):
+ return defer.succeed(([], from_key))
def get_current_token_part(self):
return defer.succeed(0)
@@ -68,7 +68,8 @@ class EventSources(object):
class StreamSource(object):
- def get_new_events_for_user(self, user, from_token, limit):
+ def get_new_events_for_user(self, user, from_key, limit):
+ """from_key is the key within this event source."""
raise NotImplementedError("get_new_events_for_user")
def get_current_token_part(self):
|