summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-15 15:28:54 +0100
committerErik Johnston <erik@matrix.org>2014-08-15 15:28:54 +0100
commit01f089d9fbb9b89fa143ac44e51529fa8ed7ec12 (patch)
tree1f3544621b65749fd379fbb328a1745d775ef2fa
parentStart chagning the events stream to work with the new DB schema (diff)
downloadsynapse-01f089d9fbb9b89fa143ac44e51529fa8ed7ec12.tar.xz
Correctly return new token when returning events. Serialize events correctly.
-rw-r--r--synapse/api/notifier.py3
-rw-r--r--synapse/api/streams/event.py2
-rw-r--r--synapse/handlers/room.py5
-rw-r--r--synapse/storage/__init__.py6
-rw-r--r--synapse/storage/stream.py18
5 files changed, 25 insertions, 9 deletions
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
 
 from synapse.api.constants import Membership
 from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
 
 from twisted.internet import defer
 from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
                 self._notify_and_callback(
                     user_id=user_id,
                     event_data=event.get_dict(),
-                    stream_type=event.type,
+                    stream_type=EventsStreamData.EVENT_TYPE,
                     store_id=store_id)
 
     def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 427363cad4..895a96b5b9 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -160,7 +160,7 @@ class EventStream(PaginationStream):
                 self.user_id, from_pkey, to_pkey, limit
             )
 
-            chunk += event_chunk
+            chunk += [e.get_dict() for e in event_chunk]
             next_ver.append(str(max_pkey))
 
         defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3451250008..9261984b7e 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -255,7 +255,10 @@ class MessageHandler(BaseHandler):
         ret = []
 
         for event in room_list:
-            d = event.get_dict()
+            d = {
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
             ret.append(d)
 
             if event.membership != Membership.JOIN:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 46b9dbcbbf..750e86040e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -57,7 +57,8 @@ class DataStore(RoomMemberStore, RoomStore,
         elif event.type == RoomTopicEvent.TYPE:
             yield self._store_room_topic(event)
 
-        yield self._store_event(event)
+        ret = yield self._store_event(event)
+        defer.returnValue(ret)
 
     @defer.inlineCallbacks
     def get_event(self, event_id):
@@ -114,6 +115,9 @@ class DataStore(RoomMemberStore, RoomStore,
                 }
             )
 
+        latest = yield self.get_room_events_max_id()
+        defer.returnValue(latest)
+
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
         sql = (
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index c5c3770a40..1300aee8b0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ class StreamStore(SQLBaseStore):
         )
 
         invites_sql = (
-            "SELECT m.event_id FROM room_membershipas as m "
+            "SELECT m.event_id FROM room_memberships as m "
             "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
             "WHERE m.user_id = ? AND m.membership = ?"
         )
@@ -55,8 +55,9 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT * FROM events as e WHERE "
-            "(room_id IN (%(current)s)) OR "
-            "(event_id IN (%(invites)s)) "
+            "((room_id IN (%(current)s)) OR "
+            "(event_id IN (%(invites)s))) "
+            " AND e.ordering > ? AND e.ordering < ? "
             "ORDER BY ordering ASC LIMIT %(limit)d"
         ) % {
             "current": current_room_membership_sql,
@@ -66,10 +67,17 @@ class StreamStore(SQLBaseStore):
 
         rows = yield self._execute_and_decode(
             sql,
-            user_id, user_id, Membership.INVITE
+            user_id, user_id, Membership.INVITE, from_key, to_key
         )
 
-        defer.returnValue([self._parse_event_from_row(r) for r in rows])
+        ret = [self._parse_event_from_row(r) for r in rows]
+
+        if ret:
+            max_id = max([r["ordering"] for r in rows])
+        else:
+            max_id = to_key
+
+        defer.returnValue((ret, max_id))
 
     @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, with_feedback=False):