summary refs log tree commit diff
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-02-25 15:00:59 +0000
committerKegan Dougal <kegan@matrix.org>2015-02-25 15:00:59 +0000
commit2d20466f9a1349c97d5a3822eb4ee64f19bbdf27 (patch)
tree5d67e29854210f2ecd334c2809555d72d9db443d
parentturns uris config options should append since it's a list (diff)
downloadsynapse-2d20466f9a1349c97d5a3822eb4ee64f19bbdf27.tar.xz
Add stub functions and work out execution flow to implement AS event stream polling.
-rw-r--r--synapse/handlers/events.py3
-rw-r--r--synapse/handlers/room.py34
-rw-r--r--synapse/storage/appservice.py19
-rw-r--r--synapse/storage/stream.py21
4 files changed, 65 insertions, 12 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 025e7e7e62..8d5f5c8499 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler):
                         )
                 self._streams_per_user[auth_user] += 1
 
-            if pagin_config.from_token is None:
-                pagin_config.from_token = None
-
             rm_handler = self.hs.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 914742d913..a8b0c95636 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler):
     def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
         """Returns a list of roomids that the user has any of the given
         membership states in."""
-        rooms = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=user.to_string(), membership_list=membership_list
+
+        app_service = yield self.store.get_app_service_by_user_id(
+            user.to_string()
         )
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+        else:
+            rooms = yield self.store.get_rooms_for_user_where_membership_is(
+                user_id=user.to_string(), membership_list=membership_list
+            )
 
         # For some reason the list of events contains duplicates
         # TODO(paul): work out why because I really don't think it should
@@ -559,13 +566,22 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
-        events, end_key = yield self.store.get_room_events_stream(
-            user_id=user.to_string(),
-            from_key=from_key,
-            to_key=to_key,
-            room_id=None,
-            limit=limit,
-        )
+        app_service = self.store.get_app_service_by_user_id(user.to_string())
+        if app_service:
+            events, end_key = yield self.store.get_appservice_room_stream(
+                service=app_service,
+                from_key=from_key,
+                to_key=to_key,
+                limit=limit,
+            )
+        else:
+            events, end_key = yield self.store.get_room_events_stream(
+                user_id=user.to_string(),
+                from_key=from_key,
+                to_key=to_key,
+                room_id=None,
+                limit=limit,
+            )
 
         defer.returnValue((events, end_key))
 
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index dc3666efd4..435ccfd6fc 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 from synapse.appservice import ApplicationService
+from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
 
@@ -151,6 +152,16 @@ class ApplicationServiceStore(SQLBaseStore):
         defer.returnValue(self.services_cache)
 
     @defer.inlineCallbacks
+    def get_app_service_by_user_id(self, user_id):
+        yield self.cache_defer  # make sure the cache is ready
+
+        for service in self.services_cache:
+            if service.sender == user_id:
+                defer.returnValue(service)
+                return
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
     def get_app_service_by_token(self, token, from_cache=True):
         """Get the application service with the given token.
 
@@ -174,6 +185,14 @@ class ApplicationServiceStore(SQLBaseStore):
         # TODO: This should be JOINed with the application_services_regex table.
 
     @defer.inlineCallbacks
+    def get_app_service_rooms(self, service):
+        logger.info("get_app_service_rooms -> %s", service)
+
+        # TODO stub
+        yield self.cache_defer
+        defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")])
+
+    @defer.inlineCallbacks
     def _populate_cache(self):
         """Populates the ApplicationServiceCache from the database."""
         sql = ("SELECT * FROM application_services LEFT JOIN "
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3ccb6f8a61..aa3c9f8c9c 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -127,6 +127,27 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
 
 
 class StreamStore(SQLBaseStore):
+
+    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+        # NB this lives here instead of appservice.py so we can reuse the
+        # 'private' StreamToken class in this file.
+        logger.info("get_appservice_room_stream -> %s", service)
+
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = MAX_STREAM_SIZE
+
+        # From and to keys should be integers from ordering.
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
+
+        if from_key == to_key:
+            return defer.succeed(([], to_key))
+
+        # TODO stub
+        return defer.succeed(([], to_key))
+
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):