summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/notifier.py6
-rw-r--r--synapse/streams/events.py21
3 files changed, 15 insertions, 16 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a32c22db33..faea30b44e 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -114,7 +114,7 @@ class MessageHandler(BaseHandler):
         """
         yield self.auth.check_joined_room(room_id, user_id)
 
-        data_source = self.hs.get_event_sources().sources[0]
+        data_source = self.hs.get_event_sources().sources["room"]
 
         if not pagin_config.from_token:
             pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
@@ -274,7 +274,7 @@ class MessageHandler(BaseHandler):
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         # FIXME (erikj): Fix this.
-        presence_stream = self.hs.get_event_sources().sources[1]
+        presence_stream = self.hs.get_event_sources().sources["presence"]
         pagination_config = PaginationConfig(from_token=now_token)
         presence, _ = yield presence_stream.get_pagination_rows(
             user, pagination_config, None
diff --git a/synapse/notifier.py b/synapse/notifier.py
index a69d5343cb..b969011b32 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -69,7 +69,7 @@ class Notifier(object):
     def on_new_room_event(self, event, extra_users=[]):
         room_id = event.room_id
 
-        source = self.event_sources.sources[0]
+        source = self.event_sources.sources["room"]
 
         listeners = self.rooms_to_listeners.get(room_id, set()).copy()
 
@@ -94,7 +94,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def on_new_user_event(self, users=[], rooms=[]):
-        source = self.event_sources.sources[1]
+        source = self.event_sources.sources["presence"]
 
         listeners = set()
 
@@ -176,7 +176,7 @@ class Notifier(object):
         limit = listener.limit
 
         # TODO (erikj): DeferredList?
-        for source in self.event_sources.sources:
+        for source in self.event_sources.sources.values():
             stuff, new_token = yield source.get_new_events_for_user(
                 listener.user,
                 from_token,
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 8a84a9d392..2e6ea6ca26 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -20,8 +20,6 @@ from synapse.types import StreamToken
 
 
 class RoomEventSource(object):
-    SIGNAL_NAME = "RoomEventSource"
-
     def __init__(self, hs):
         self.store = hs.get_datastore()
 
@@ -70,8 +68,6 @@ class RoomEventSource(object):
 
 
 class PresenceSource(object):
-    SIGNAL_NAME = "PresenceSource"
-
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
@@ -150,13 +146,16 @@ class PresenceSource(object):
 
 
 class EventSources(object):
-    SOURCE_TYPES = [
-        RoomEventSource,
-        PresenceSource,
-    ]
+    SOURCE_TYPES = {
+        "room": RoomEventSource,
+        "presence": PresenceSource,
+    }
 
     def __init__(self, hs):
-        self.sources = [t(hs) for t in EventSources.SOURCE_TYPES]
+        self.sources = {
+            name: cls(hs)
+            for name, cls in EventSources.SOURCE_TYPES.items()
+        }
 
     @staticmethod
     def create_token(events_key, presence_key):
@@ -164,8 +163,8 @@ class EventSources(object):
 
     @defer.inlineCallbacks
     def get_current_token(self):
-        events_key = yield self.sources[0].get_current_token_part()
-        presence_key = yield self.sources[1].get_current_token_part()
+        events_key = yield self.sources["room"].get_current_token_part()
+        presence_key = yield self.sources["presence"].get_current_token_part()
         token = EventSources.create_token(events_key, presence_key)
         defer.returnValue(token)