From 20d0db6cfb8efce079376eb6bd2c8cca4f4cab16 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 29 Aug 2014 17:09:15 +0100 Subject: Move the *EventSource classes into the handlers they relate to, so it's easier to find the code --- synapse/handlers/room.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'synapse/handlers/room.py') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3e41d7a46b..6fbb4bc18d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -462,3 +462,51 @@ class RoomListHandler(BaseRoomHandler): chunk = yield self.store.get_rooms(is_public=True) # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) + + +class RoomEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_token, limit): + # We just ignore the key for now. + + to_key = yield self.get_current_token_part() + + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_token.events_key, + to_key=to_key, + room_id=None, + limit=limit, + ) + + end_token = from_token.copy_and_replace("events_key", end_key) + + defer.returnValue((events, end_token)) + + def get_current_token_part(self): + return self.store.get_room_events_max_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, pagination_config, key): + from_token = pagination_config.from_token + to_token = pagination_config.to_token + limit = pagination_config.limit + direction = pagination_config.direction + + to_key = to_token.events_key if to_token else None + + events, next_key = yield self.store.paginate_room_events( + room_id=key, + from_key=from_token.events_key, + to_key=to_key, + direction=direction, + limit=limit, + with_feedback=True + ) + + next_token = from_token.copy_and_replace("events_key", next_key) + + defer.returnValue((events, next_token)) -- cgit 1.5.1 From 4bfdec1eb28aa272391607b2cf1f24c781d9ba74 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 29 Aug 2014 18:39:09 +0100 Subject: Rename 'events_key' to 'room_key' so it matches the name of the event source --- synapse/handlers/message.py | 6 +++--- synapse/handlers/room.py | 10 +++++----- synapse/streams/events.py | 2 +- synapse/types.py | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/handlers/room.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3d7f97bcff..c8ff34e5f5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -274,11 +274,11 @@ class MessageHandler(BaseRoomHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.events_key, + end_token=now_token.room_key, ) - start_token = now_token.copy_and_replace("events_key", token[0]) - end_token = now_token.copy_and_replace("events_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) d["messages"] = { "chunk": [m.get_dict() for m in messages], diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6fbb4bc18d..b27bdecd43 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -476,13 +476,13 @@ class RoomEventSource(object): events, end_key = yield self.store.get_room_events_stream( user_id=user.to_string(), - from_key=from_token.events_key, + from_key=from_token.room_key, to_key=to_key, room_id=None, limit=limit, ) - end_token = from_token.copy_and_replace("events_key", end_key) + end_token = from_token.copy_and_replace("room_key", end_key) defer.returnValue((events, end_token)) @@ -496,17 +496,17 @@ class RoomEventSource(object): limit = pagination_config.limit direction = pagination_config.direction - to_key = to_token.events_key if to_token else None + to_key = to_token.room_key if to_token else None events, next_key = yield self.store.paginate_room_events( room_id=key, - from_key=from_token.events_key, + from_key=from_token.room_key, to_key=to_key, direction=direction, limit=limit, with_feedback=True ) - next_token = from_token.copy_and_replace("events_key", next_key) + next_token = from_token.copy_and_replace("room_key", next_key) defer.returnValue((events, next_token)) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 4bec6605bd..8480368673 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -54,7 +54,7 @@ class EventSources(object): @defer.inlineCallbacks def get_current_token(self): token = StreamToken( - events_key=( + room_key=( yield self.sources["room"].get_current_token_part() ), presence_key=( diff --git a/synapse/types.py b/synapse/types.py index d93b02a56e..1a9dceabf5 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -97,7 +97,7 @@ class RoomID(DomainSpecificString): class StreamToken( namedtuple( "Token", - ("events_key", "presence_key", "typing_key") + ("room_key", "presence_key", "typing_key") ) ): _SEPARATOR = "_" -- cgit 1.5.1 From eec67a675f7ea3545bfba79c6b753f63f7fd9b3b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 29 Aug 2014 19:13:55 +0100 Subject: Have EventSource's get_new_events_for_user() API work only on keys within that source, not overall eventstream tokens --- synapse/handlers/presence.py | 14 ++++---------- synapse/handlers/room.py | 8 +++----- synapse/handlers/typing.py | 4 ++-- synapse/notifier.py | 30 ++++++++++++++++++++---------- synapse/streams/events.py | 7 ++++--- 5 files changed, 33 insertions(+), 30 deletions(-) (limited to 'synapse/handlers/room.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1d3b02a9db..05bf145240 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -727,8 +727,8 @@ class PresenceEventSource(object): self.hs = hs self.clock = hs.get_clock() - def get_new_events_for_user(self, user, from_token, limit): - from_key = int(from_token.presence_key) + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap @@ -743,15 +743,9 @@ class PresenceEventSource(object): latest_serial = max([x[1].serial for x in updates]) data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - end_token = from_token.copy_and_replace( - "presence_key", latest_serial - ) - return ((data, end_token)) + return ((data, latest_serial)) else: - end_token = from_token.copy_and_replace( - "presence_key", presence._user_cachemap_latest_serial - ) - return (([], end_token)) + return (([], presence._user_cachemap_latest_serial)) def get_current_token_part(self): presence = self.hs.get_handlers().presence_handler diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b27bdecd43..ce15420bf4 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -469,22 +469,20 @@ class RoomEventSource(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_token, limit): + def get_new_events_for_user(self, user, from_key, limit): # We just ignore the key for now. to_key = yield self.get_current_token_part() events, end_key = yield self.store.get_room_events_stream( user_id=user.to_string(), - from_key=from_token.room_key, + from_key=from_key, to_key=to_key, room_id=None, limit=limit, ) - end_token = from_token.copy_and_replace("room_key", end_key) - - defer.returnValue((events, end_token)) + defer.returnValue((events, end_key)) def get_current_token_part(self): return self.store.get_room_events_max_id() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index ecb9318d1c..238b063483 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -151,8 +151,8 @@ class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs - def get_new_events_for_user(self, user, from_token, limit): - return ([], from_token) + def get_new_events_for_user(self, user, from_key, limit): + return ([], from_key) def get_current_token_part(self): return 0 diff --git a/synapse/notifier.py b/synapse/notifier.py index b6d5ec4820..cb544e9886 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -95,7 +95,7 @@ class Notifier(object): """ room_id = event.room_id - source = self.event_sources.sources["room"] + room_source = self.event_sources.sources["room"] listeners = self.rooms_to_listeners.get(room_id, set()).copy() @@ -107,13 +107,17 @@ class Notifier(object): # TODO (erikj): Can we make this more efficient by hitting the # db once? for listener in listeners: - events, end_token = yield source.get_new_events_for_user( + events, end_key = yield room_source.get_new_events_for_user( listener.user, - listener.from_token, + listener.from_token.room_key, listener.limit, ) if events: + end_token = listener.from_token.copy_and_replace( + "room_key", end_key + ) + listener.notify( self, events, listener.from_token, end_token ) @@ -126,7 +130,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - source = self.event_sources.sources["presence"] + presence_source = self.event_sources.sources["presence"] listeners = set() @@ -137,13 +141,17 @@ class Notifier(object): listeners |= self.rooms_to_listeners.get(room, set()).copy() for listener in listeners: - events, end_token = yield source.get_new_events_for_user( + events, end_key = yield presence_source.get_new_events_for_user( listener.user, - listener.from_token, + listener.from_token.presence_key, listener.limit, ) if events: + end_token = listener.from_token.copy_and_replace( + "presence_key", end_key + ) + listener.notify( self, events, listener.from_token, end_token ) @@ -216,16 +224,18 @@ class Notifier(object): limit = listener.limit # TODO (erikj): DeferredList? - for source in self.event_sources.sources.values(): - stuff, new_token = yield source.get_new_events_for_user( + for name, source in self.event_sources.sources.items(): + keyname = "%s_key" % name + + stuff, new_key = yield source.get_new_events_for_user( listener.user, - from_token, + getattr(from_token, keyname), limit, ) events.extend(stuff) - from_token = new_token + from_token = from_token.copy_and_replace(keyname, new_key) end_token = from_token diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 8480368673..43b6b1eba3 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -28,8 +28,8 @@ class NullSource(object): def __init__(self, hs): pass - def get_new_events_for_user(self, user, from_token, limit): - return defer.succeed(([], from_token)) + def get_new_events_for_user(self, user, from_key, limit): + return defer.succeed(([], from_key)) def get_current_token_part(self): return defer.succeed(0) @@ -68,7 +68,8 @@ class EventSources(object): class StreamSource(object): - def get_new_events_for_user(self, user, from_token, limit): + def get_new_events_for_user(self, user, from_key, limit): + """from_key is the key within this event source.""" raise NotImplementedError("get_new_events_for_user") def get_current_token_part(self): -- cgit 1.5.1 From a8e8d1d06c078de49711768357267cf4168999ea Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 29 Aug 2014 19:15:23 +0100 Subject: Renamed get_current_token_part to get_current_key --- synapse/handlers/presence.py | 2 +- synapse/handlers/room.py | 4 ++-- synapse/handlers/typing.py | 2 +- synapse/streams/events.py | 12 ++++++------ 4 files changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/handlers/room.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 05bf145240..cc28151e35 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -747,7 +747,7 @@ class PresenceEventSource(object): else: return (([], presence._user_cachemap_latest_serial)) - def get_current_token_part(self): + def get_current_key(self): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ce15420bf4..c54e0f963b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -472,7 +472,7 @@ class RoomEventSource(object): def get_new_events_for_user(self, user, from_key, limit): # We just ignore the key for now. - to_key = yield self.get_current_token_part() + to_key = yield self.get_current_key() events, end_key = yield self.store.get_room_events_stream( user_id=user.to_string(), @@ -484,7 +484,7 @@ class RoomEventSource(object): defer.returnValue((events, end_key)) - def get_current_token_part(self): + def get_current_key(self): return self.store.get_room_events_max_id() @defer.inlineCallbacks diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 238b063483..3268427ecd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -154,7 +154,7 @@ class TypingNotificationEventSource(object): def get_new_events_for_user(self, user, from_key, limit): return ([], from_key) - def get_current_token_part(self): + def get_current_key(self): return 0 def get_pagination_rows(self, user, pagination_config, key): diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 43b6b1eba3..08d6e6f733 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -31,7 +31,7 @@ class NullSource(object): def get_new_events_for_user(self, user, from_key, limit): return defer.succeed(([], from_key)) - def get_current_token_part(self): + def get_current_key(self): return defer.succeed(0) def get_pagination_rows(self, user, pagination_config, key): @@ -55,13 +55,13 @@ class EventSources(object): def get_current_token(self): token = StreamToken( room_key=( - yield self.sources["room"].get_current_token_part() + yield self.sources["room"].get_current_key() ), presence_key=( - yield self.sources["presence"].get_current_token_part() + yield self.sources["presence"].get_current_key() ), typing_key=( - yield self.sources["typing"].get_current_token_part() + yield self.sources["typing"].get_current_key() ) ) defer.returnValue(token) @@ -72,8 +72,8 @@ class StreamSource(object): """from_key is the key within this event source.""" raise NotImplementedError("get_new_events_for_user") - def get_current_token_part(self): - raise NotImplementedError("get_current_token_part") + def get_current_key(self): + raise NotImplementedError("get_current_key") def get_pagination_rows(self, user, pagination_config, key): raise NotImplementedError("get_rows") -- cgit 1.5.1