diff options
-rw-r--r-- | synapse/api/notifier.py | 3 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 5 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 6 | ||||
-rw-r--r-- | synapse/storage/stream.py | 18 |
5 files changed, 25 insertions, 9 deletions
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py index 65b5a4ebb3..9f622df6bb 100644 --- a/synapse/api/notifier.py +++ b/synapse/api/notifier.py @@ -15,6 +15,7 @@ from synapse.api.constants import Membership from synapse.api.events.room import RoomMemberEvent +from synapse.api.streams.event import EventsStreamData from twisted.internet import defer from twisted.internet import reactor @@ -66,7 +67,7 @@ class Notifier(object): self._notify_and_callback( user_id=user_id, event_data=event.get_dict(), - stream_type=event.type, + stream_type=EventsStreamData.EVENT_TYPE, store_id=store_id) def on_new_user_event(self, user_id, event_data, stream_type, store_id): diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 427363cad4..895a96b5b9 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -160,7 +160,7 @@ class EventStream(PaginationStream): self.user_id, from_pkey, to_pkey, limit ) - chunk += event_chunk + chunk += [e.get_dict() for e in event_chunk] next_ver.append(str(max_pkey)) defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3451250008..9261984b7e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -255,7 +255,10 @@ class MessageHandler(BaseHandler): ret = [] for event in room_list: - d = event.get_dict() + d = { + "room_id": event.room_id, + "membership": event.membership, + } ret.append(d) if event.membership != Membership.JOIN: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 46b9dbcbbf..750e86040e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,8 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - yield self._store_event(event) + ret = yield self._store_event(event) + defer.returnValue(ret) @defer.inlineCallbacks def get_event(self, event_id): @@ -114,6 +115,9 @@ class DataStore(RoomMemberStore, RoomStore, } ) + latest = yield self.get_room_events_max_id() + defer.returnValue(latest) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c5c3770a40..1300aee8b0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ class StreamStore(SQLBaseStore): ) invites_sql = ( - "SELECT m.event_id FROM room_membershipas as m " + "SELECT m.event_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -55,8 +55,9 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events as e WHERE " - "(room_id IN (%(current)s)) OR " - "(event_id IN (%(invites)s)) " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + " AND e.ordering > ? AND e.ordering < ? " "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, @@ -66,10 +67,17 @@ class StreamStore(SQLBaseStore): rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE + user_id, user_id, Membership.INVITE, from_key, to_key ) - defer.returnValue([self._parse_event_from_row(r) for r in rows]) + ret = [self._parse_event_from_row(r) for r in rows] + + if ret: + max_id = max([r["ordering"] for r in rows]) + else: + max_id = to_key + + defer.returnValue((ret, max_id)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): |