diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
from twisted.internet import defer
from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
self._notify_and_callback(
user_id=user_id,
event_data=event.get_dict(),
- stream_type=event.type,
+ stream_type=EventsStreamData.EVENT_TYPE,
store_id=store_id)
def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 427363cad4..895a96b5b9 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -160,7 +160,7 @@ class EventStream(PaginationStream):
self.user_id, from_pkey, to_pkey, limit
)
- chunk += event_chunk
+ chunk += [e.get_dict() for e in event_chunk]
next_ver.append(str(max_pkey))
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3451250008..9261984b7e 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -255,7 +255,10 @@ class MessageHandler(BaseHandler):
ret = []
for event in room_list:
- d = event.get_dict()
+ d = {
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
ret.append(d)
if event.membership != Membership.JOIN:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 46b9dbcbbf..750e86040e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -57,7 +57,8 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == RoomTopicEvent.TYPE:
yield self._store_room_topic(event)
- yield self._store_event(event)
+ ret = yield self._store_event(event)
+ defer.returnValue(ret)
@defer.inlineCallbacks
def get_event(self, event_id):
@@ -114,6 +115,9 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
+ latest = yield self.get_room_events_max_id()
+ defer.returnValue(latest)
+
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
sql = (
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index c5c3770a40..1300aee8b0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ class StreamStore(SQLBaseStore):
)
invites_sql = (
- "SELECT m.event_id FROM room_membershipas as m "
+ "SELECT m.event_id FROM room_memberships as m "
"INNER JOIN current_state_events as c ON m.event_id = c.event_id "
"WHERE m.user_id = ? AND m.membership = ?"
)
@@ -55,8 +55,9 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT * FROM events as e WHERE "
- "(room_id IN (%(current)s)) OR "
- "(event_id IN (%(invites)s)) "
+ "((room_id IN (%(current)s)) OR "
+ "(event_id IN (%(invites)s))) "
+ " AND e.ordering > ? AND e.ordering < ? "
"ORDER BY ordering ASC LIMIT %(limit)d"
) % {
"current": current_room_membership_sql,
@@ -66,10 +67,17 @@ class StreamStore(SQLBaseStore):
rows = yield self._execute_and_decode(
sql,
- user_id, user_id, Membership.INVITE
+ user_id, user_id, Membership.INVITE, from_key, to_key
)
- defer.returnValue([self._parse_event_from_row(r) for r in rows])
+ ret = [self._parse_event_from_row(r) for r in rows]
+
+ if ret:
+ max_id = max([r["ordering"] for r in rows])
+ else:
+ max_id = to_key
+
+ defer.returnValue((ret, max_id))
@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
|