From 661c7117659118ed977f56a092525dbdae9dc67c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 17:34:37 +0100 Subject: Start fixing places that use the data store. --- synapse/handlers/room.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index eae40765b3..a9ff2d93f1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -141,12 +141,7 @@ class MessageHandler(BaseHandler): yield self.state_handler.handle_new_event(event) # store in db - store_id = yield self.store.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + store_id = yield self.store.persist_event(event) event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id @@ -201,19 +196,15 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_room_data(room_id, event_type, state_key) + data = yield self.store.get_current_state(room_id, event_type, state_key) defer.returnValue(data) @defer.inlineCallbacks - def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, - user_id=None, fb_sender_id=None, fb_type=None): + def get_feedback(self, event_id): yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db - fb = yield self.store.get_feedback( - room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, fb_type=fb_type - ) + fb = yield self.store.get_feedback(event_id) if fb: defer.returnValue(fb) -- cgit 1.4.1 From 2c46bb620828efaebdbae37e5212a28b505ee72d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 18:40:50 +0100 Subject: Fix up typos and correct sql queries --- synapse/handlers/room.py | 10 ++-------- synapse/storage/__init__.py | 20 ++++++++++---------- synapse/storage/_base.py | 9 +++++---- synapse/storage/roommember.py | 26 +++++++++++++------------- synapse/storage/schema/im.sql | 4 ++-- synapse/storage/stream.py | 11 ++++------- 6 files changed, 36 insertions(+), 44 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a9ff2d93f1..9b55206e47 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -201,7 +201,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_feedback(self, event_id): - yield self.auth.check_joined_room(room_id, user_id) + # yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db fb = yield self.store.get_feedback(event_id) @@ -690,13 +690,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): # store membership - store_id = yield self.store.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=membership - ) + store_id = yield self.store.persist_event(event) # Send a PDU to all hosts who have joined the room. destinations = yield self.store.get_joined_hosts_for_room( diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 182b6ebadd..f41c21dcd2 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, @@ -52,7 +53,7 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomConfigEvent.TYPE: yield self._store_room_config(event) - self._store_event(event) + yield self._store_event(event) @defer.inlineCallbacks def get_event(self, event_id): @@ -76,14 +77,13 @@ class DataStore(RoomMemberStore, RoomStore, def _store_event(self, event): vals = { "event_id": event.event_id, - "event_type": event.type, - "sender": event.user_id, + "type": event.type, "room_id": event.room_id, "content": json.dumps(event.content), } - unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} - val["unrecognized_keys"] = json.dumps(unrec) + unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + vals["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) @@ -91,7 +91,7 @@ class DataStore(RoomMemberStore, RoomStore, vals = { "event_id": event.event_id, "room_id": event.room_id, - "event_type": event.event_type, + "type": event.type, "state_key": event.state_key, } @@ -103,16 +103,16 @@ class DataStore(RoomMemberStore, RoomStore, # TODO (erikj): We also need to update the current state table? @defer.inlineCallbacks - def get_current_state(room_id, event_type=None, state_key=""): + def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( - "SELECT e.* FROM events as e" - "INNER JOIN current_state as c ON e.event_id = c.event_id " + "SELECT e.* FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " "INNER JOIN state_events as s ON e.event_id = s.event_id " "WHERE c.room_id = ? " ) if event_type: - sql += " s.type = ? AND s.state_key = ? " + sql += " AND s.type = ? AND s.state_key = ? " args = (room_id, event_type, state_key) else: args = (room_id, ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 533f509709..c8ec63f30a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import copy import json @@ -59,7 +60,7 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ + "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None ) def interaction(txn): @@ -72,7 +73,7 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(interaction) def _execute_and_decode(self, query, *args): - return self._execute(self.cursor_to_dict, *args) + return self._execute(self.cursor_to_dict, query, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -291,8 +292,8 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) def _parse_event_from_row(self, row_dict): - d = copy.deepcopy({k: v for k, v in row.items() if v}) - d.update(json.loads(json.loads(row["unrecognized_keys"]))) + d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 14c0152e8a..8c4b04f190 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -54,13 +54,13 @@ class RoomMemberStore(SQLBaseStore): "INSERT OR IGNORE INTO room_hosts (room_id, host) " "VALUES (?, ?)" ) - yield self._execute(None, sql, room_id, domain) + yield self._execute(None, sql, event.room_id, domain) else: sql = ( "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" ) - yield self._execute(None, sql, room_id, domain) + yield self._execute(None, sql, event.room_id, domain) def get_room_member(self, user_id, room_id): @@ -72,10 +72,10 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - return self._get_members_by_dict( - room_id=room_id, - user_id=user_id - ) + return self._get_members_by_dict({ + "e.room_id": room_id, + "m.user_id": user_id, + }) def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -89,11 +89,11 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"room_id": room_id} + where = {"m.room_id": room_id} if membership: - where["membership"] = membership + where["m.membership"] = membership - return self._get_members_by_dict(**membership) + return self._get_members_by_dict(where) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -126,8 +126,8 @@ class RoomMemberStore(SQLBaseStore): ) def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where.keys()) - vals = where.values() + clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) + vals = where_dict.values() return self._get_members_query(clause, vals) @defer.inlineCallbacks @@ -136,11 +136,11 @@ class RoomMemberStore(SQLBaseStore): "SELECT e.* FROM events as e " "INNER JOIN room_memberships as m " "ON e.event_id = m.event_id " - "INNER JOIN current_state as c " + "INNER JOIN current_state_events as c " "ON m.event_id = c.event_id " "WHERE %s " ) % (where_clause,) - rows = yield self._execute_and_decode(sql, where_values) + rows = yield self._execute_and_decode(sql, *where_values) results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 7f564c8540..85c0c7119c 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -30,9 +30,9 @@ CREATE TABLE IF NOT EXISTS state_events( prev_state TEXT ); -CREATE TABLE IF NOT EXISTS current_state( +CREATE TABLE IF NOT EXISTS current_state_events( event_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1456d216f0..9937239c22 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,12 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable from synapse.api.constants import Membership @@ -40,13 +37,13 @@ class StreamStore(SQLBaseStore): current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state as c ON m.event_id = c.event_id " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ?" ) invites_sql = ( "SELECT m.event_id FROM room_membershipas as m " - "INNER JOIN current_state as c ON m.event_id = c.event_id " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -71,4 +68,4 @@ class StreamStore(SQLBaseStore): user_id, user_id, Membership.INVITE ) - defer.returnValue([self._parse_event_from_row(r) for r in results]) + defer.returnValue([self._parse_event_from_row(r) for r in rows]) -- cgit 1.4.1 From 5002efa31bb57a92b87b9d7319641d9b5a2a6047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 10:26:35 +0100 Subject: Reimplement the get public rooms api to work with new DB schema --- synapse/api/events/factory.py | 3 +- synapse/api/events/room.py | 23 +++++++++++ synapse/handlers/room.py | 2 +- synapse/storage/__init__.py | 6 ++- synapse/storage/_base.py | 2 +- synapse/storage/room.py | 90 +++++++++++++++++++++++++++---------------- synapse/storage/schema/im.sql | 12 ++++++ 7 files changed, 101 insertions(+), 37 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 12aa04fc6e..23d2b0401c 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -15,7 +15,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, - InviteJoinEvent, RoomConfigEvent + InviteJoinEvent, RoomConfigEvent, RoomNameEvent, ) from synapse.util.stringutils import random_string @@ -25,6 +25,7 @@ class EventFactory(object): _event_classes = [ RoomTopicEvent, + RoomNameEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index f3df849af2..8136d495d5 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -19,14 +19,37 @@ from . import SynapseEvent class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" + internal_keys = SynapseEvent.internal_keys + [ + "topic", + ] + def __init__(self, **kwargs): kwargs["state_key"] = "" + if "topic" in kwargs["content"]: + kwargs["topic"] = kwargs["content"]["topic"] super(RoomTopicEvent, self).__init__(**kwargs) def get_content_template(self): return {"topic": u"string"} +class RoomNameEvent(SynapseEvent): + TYPE = "m.room.name" + + internal_keys = SynapseEvent.internal_keys + [ + "name", + ] + + def __init__(self, **kwargs): + kwargs["state_key"] = "" + if "name" in kwargs["content"]: + kwargs["name"] = kwargs["content"]["name"] + super(RoomNameEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"name": u"string"} + + class RoomMemberEvent(SynapseEvent): TYPE = "m.room.member" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9b55206e47..5c1b59dbc9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -790,5 +790,5 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): - chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + chunk = yield self.store.get_rooms(is_public=True) defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f41c21dcd2..f62cee3c39 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, - RoomConfigEvent + RoomConfigEvent, RoomNameEvent, ) from .directory import DirectoryStore @@ -52,6 +52,10 @@ class DataStore(RoomMemberStore, RoomStore, yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: yield self._store_room_config(event) + elif event.type == RoomNameEvent.TYPE: + yield self._store_room_name(event) + elif event.type == RoomTopicEvent.TYPE: + yield self._store_room_topic(event) yield self._store_event(event) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c8ec63f30a..c26e9a0f98 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -68,7 +68,7 @@ class SQLBaseStore(object): if decoder: return decoder(cursor) else: - return cursor + return cursor.fetchall() return self._db_pool.runInteraction(interaction) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a97162831b..8f44b67d8c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -76,49 +76,73 @@ class RoomStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_rooms(self, is_public, with_topics): + def get_rooms(self, is_public): """Retrieve a list of all public rooms. Args: is_public (bool): True if the rooms returned should be public. - with_topics (bool): True to include the current topic for the room - in the response. Returns: - A list of room dicts containing at least a "room_id" key, and a - "topic" key if one is set and with_topic=True. + A list of room dicts containing at least a "room_id" key, a + "topic" key if one is set, and a "name" key if one is set """ - room_data_type = RoomTopicEvent.TYPE - public = 1 if is_public else 0 - - latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE " - + "room_data.type = ? GROUP BY room_id") - - query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms " - + "LEFT JOIN " - + "room_aliases ON room_aliases.room_id = rooms.room_id " - + "LEFT JOIN " - + "room_data ON rooms.room_id = room_data.room_id WHERE " - + "(room_data.id IN (" + latest_topic + ") " - + "OR room_data.id IS NULL) AND rooms.is_public = ?") - - res = yield self._execute( - self.cursor_to_dict, query, room_data_type, public + + topic_subquery = ( + "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "INNER JOIN current_state_events as c " + "ON c.event_id = topics.event_id " ) - # return only the keys the specification expects - ret_keys = ["room_id", "topic", "room_alias"] + name_subquery = ( + "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "INNER JOIN current_state_events as c " + "ON c.event_id = room_names.event_id " + ) - # extract topic from the json (icky) FIXME - for i, room_row in enumerate(res): - try: - content_json = json.loads(room_row["content"]) - room_row["topic"] = content_json["topic"] - except: - pass # no topic set - # filter the dict based on ret_keys - res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys} + sql = ( + "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " + "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " + "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " + "WHERE r.is_public = ? " + "GROUP BY r.room_id " + ) % { + "topic": topic_subquery, + "name": name_subquery, + } + + rows = yield self._execute(None, sql, is_public) + + ret = [ + { + "room_id": r[0], + "name": r[1], + "topic": r[2], + "aliases": r[3].split(","), + } + for r in rows + ] + + defer.returnValue(ret) + + def _store_room_topic(self, event): + return self._simple_insert( + "topics", + { + "event_id": event.event_id, + "room_id": event.room_id, + "topic": event.topic, + } + ) - defer.returnValue(res) + def _store_room_name(self, event): + return self._simple_insert( + "room_names", + { + "event_id": event.event_id, + "room_id": event.room_id, + "name": event.name, + } + ) class RoomsTable(Table): diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 85c0c7119c..9a0f2145d5 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -51,6 +51,18 @@ CREATE TABLE IF NOT EXISTS feedback( room_id TEXT ); +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + topic TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL +); + CREATE TABLE IF NOT EXISTS rooms( room_id TEXT PRIMARY KEY NOT NULL, is_public INTEGER, -- cgit 1.4.1 From 114984a2361ee41005a769f6dc127c470ee08aee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 13:58:28 +0100 Subject: Start chagning the events stream to work with the new DB schema --- synapse/api/streams/event.py | 77 +++-------------------------------------- synapse/handlers/events.py | 8 ++--- synapse/handlers/room.py | 79 +++++++++++++++++++++++-------------------- synapse/storage/__init__.py | 10 +++++- synapse/storage/_base.py | 2 +- synapse/storage/roommember.py | 8 ++++- synapse/storage/schema/im.sql | 5 ++- synapse/storage/stream.py | 31 +++++++++++++++++ 8 files changed, 102 insertions(+), 118 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 4b6d739e54..427363cad4 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -28,17 +28,17 @@ import logging logger = logging.getLogger(__name__) -class MessagesStreamData(StreamData): - EVENT_TYPE = MessageEvent.TYPE +class EventsStreamData(StreamData): + EVENT_TYPE = "EventsStream" def __init__(self, hs, room_id=None, feedback=False): - super(MessagesStreamData, self).__init__(hs) + super(EventsStreamData, self).__init__(hs) self.room_id = room_id self.with_feedback = feedback @defer.inlineCallbacks def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_message_stream( + data, latest_ver = yield self.store.get_room_events_stream( user_id=user_id, from_key=from_key, to_key=to_key, @@ -50,74 +50,7 @@ class MessagesStreamData(StreamData): @defer.inlineCallbacks def max_token(self): - val = yield self.store.get_max_message_id() - defer.returnValue(val) - - -class RoomMemberStreamData(StreamData): - EVENT_TYPE = RoomMemberEvent.TYPE - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_member_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key - ) - - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_member_id() - defer.returnValue(val) - - -class FeedbackStreamData(StreamData): - EVENT_TYPE = FeedbackEvent.TYPE - - def __init__(self, hs, room_id=None): - super(FeedbackStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_feedback_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_feedback_id() - defer.returnValue(val) - - -class RoomDataStreamData(StreamData): - EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types - - def __init__(self, hs, room_id=None): - super(RoomDataStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_data_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_data_id() + val = yield self.store.get_room_events_max_id() defer.returnValue(val) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3af7d824a2..6bb797caf2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,8 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.streams.event import ( - EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, - RoomDataStreamData + EventStream, EventsStreamData ) from synapse.handlers.presence import PresenceStreamData @@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData class EventStreamHandler(BaseHandler): stream_data_classes = [ - MessagesStreamData, - RoomMemberStreamData, - FeedbackStreamData, - RoomDataStreamData, + EventsStreamData, PresenceStreamData, ] diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 432d13982a..3451250008 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.api.streams.event import EventStream, EventsStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -97,30 +97,30 @@ class MessageHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) yield self.hs.get_federation().handle_new_event(event) - - @defer.inlineCallbacks - def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False): - """Get messages in a room. - - Args: - user_id (str): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages - Returns: - dict: Pagination API results - """ - yield self.auth.check_joined_room(room_id, user_id) - - data_source = [MessagesStreamData(self.hs, room_id=room_id, - feedback=feedback)] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) - +# +# @defer.inlineCallbacks +# def get_messages(self, user_id=None, room_id=None, pagin_config=None, +# feedback=False): +# """Get messages in a room. +# +# Args: +# user_id (str): The user requesting messages. +# room_id (str): The room they want messages from. +# pagin_config (synapse.api.streams.PaginationConfig): The pagination +# config rules to apply, if any. +# feedback (bool): True to get compressed feedback with the messages +# Returns: +# dict: Pagination API results +# """ +# yield self.auth.check_joined_room(room_id, user_id) +# +# data_source = [MessagesStreamData(self.hs, room_id=room_id, +# feedback=feedback)] +# event_stream = EventStream(user_id, data_source) +# pagin_config = yield event_stream.fix_tokens(pagin_config) +# data_chunk = yield event_stream.get_chunk(config=pagin_config) +# defer.returnValue(data_chunk) +# @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): """ Stores data for a room. @@ -251,20 +251,27 @@ class MessageHandler(BaseHandler): user_id=user_id, membership_list=[Membership.INVITE, Membership.JOIN] ) - for room_info in room_list: - if room_info["membership"] != Membership.JOIN: + + ret = [] + + for event in room_list: + d = event.get_dict() + ret.append(d) + + if event.membership != Membership.JOIN: continue try: - event_chunk = yield self.get_messages( - user_id=user_id, - pagin_config=pagin_config, - feedback=feedback, - room_id=room_info["room_id"] + messages = yield self.store.get_recent_events_for_room( + event.room_id, + limit=50, ) - room_info["messages"] = event_chunk + d["messages"] = [m.get_dict() for m in messages] except: pass - defer.returnValue(room_list) + + logger.debug("snapshot_all_rooms returning: %s", ret) + + defer.returnValue(ret) class RoomCreationHandler(BaseHandler): @@ -442,7 +449,7 @@ class RoomMemberHandler(BaseHandler): member_list = yield self.store.get_room_members(room_id=room_id) event_list = [ - entry.as_event(self.event_factory).get_dict() + entry.get_dict() for entry in member_list ] chunk_data = { @@ -685,7 +692,7 @@ class RoomMemberHandler(BaseHandler): user_id=user.to_string(), membership_list=membership_list ) - defer.returnValue([r["room_id"] for r in rooms]) + defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f62cee3c39..46b9dbcbbf 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -104,7 +104,15 @@ class DataStore(RoomMemberStore, RoomStore, yield self._simple_insert("state_events", vals) - # TODO (erikj): We also need to update the current state table? + yield self._simple_insert( + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c26e9a0f98..413838f798 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -293,7 +293,7 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) - d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) + d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c4b04f190..a0620677b9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -63,6 +63,7 @@ class RoomMemberStore(SQLBaseStore): yield self._execute(None, sql, event.room_id, domain) + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -72,11 +73,13 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - return self._get_members_by_dict({ + rows = yield self._get_members_by_dict({ "e.room_id": room_id, "m.user_id": user_id, }) + defer.returnValue(rows[0] if rows else None) + def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -142,5 +145,8 @@ class RoomMemberStore(SQLBaseStore): ) % (where_clause,) rows = yield self._execute_and_decode(sql, *where_values) + + logger.debug("_get_members_query Got rows %s", rows) + results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 9a0f2145d5..2452890ea4 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -32,7 +32,10 @@ CREATE TABLE IF NOT EXISTS state_events( CREATE TABLE IF NOT EXISTS current_state_events( event_id TEXT NOT NULL, - room_id TEXT NOT NULL + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 9937239c22..c5c3770a40 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -34,6 +34,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): + # TODO (erikj): Handle compressed feedback current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " @@ -69,3 +70,33 @@ class StreamStore(SQLBaseStore): ) defer.returnValue([self._parse_event_from_row(r) for r in rows]) + + @defer.inlineCallbacks + def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + # TODO (erikj): Handle compressed feedback + + sql = ( + "SELECT * FROM events WHERE room_id = ? " + "ORDER BY ordering DESC LIMIT ? " + ) + + rows = yield self._execute_and_decode( + sql, + room_id, limit + ) + + rows.reverse() # As we selected with reverse ordering + + defer.returnValue([self._parse_event_from_row(r) for r in rows]) + + @defer.inlineCallbacks + def get_room_events_max_id(self): + res = yield self._execute_and_decode( + "SELECT MAX(ordering) as m FROM events" + ) + + if not res: + defer.returnValue(0) + return + + defer.returnValue(res[0]["m"]) -- cgit 1.4.1 From 01f089d9fbb9b89fa143ac44e51529fa8ed7ec12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 15:28:54 +0100 Subject: Correctly return new token when returning events. Serialize events correctly. --- synapse/api/notifier.py | 3 ++- synapse/api/streams/event.py | 2 +- synapse/handlers/room.py | 5 ++++- synapse/storage/__init__.py | 6 +++++- synapse/storage/stream.py | 18 +++++++++++++----- 5 files changed, 25 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py index 65b5a4ebb3..9f622df6bb 100644 --- a/synapse/api/notifier.py +++ b/synapse/api/notifier.py @@ -15,6 +15,7 @@ from synapse.api.constants import Membership from synapse.api.events.room import RoomMemberEvent +from synapse.api.streams.event import EventsStreamData from twisted.internet import defer from twisted.internet import reactor @@ -66,7 +67,7 @@ class Notifier(object): self._notify_and_callback( user_id=user_id, event_data=event.get_dict(), - stream_type=event.type, + stream_type=EventsStreamData.EVENT_TYPE, store_id=store_id) def on_new_user_event(self, user_id, event_data, stream_type, store_id): diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 427363cad4..895a96b5b9 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -160,7 +160,7 @@ class EventStream(PaginationStream): self.user_id, from_pkey, to_pkey, limit ) - chunk += event_chunk + chunk += [e.get_dict() for e in event_chunk] next_ver.append(str(max_pkey)) defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3451250008..9261984b7e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -255,7 +255,10 @@ class MessageHandler(BaseHandler): ret = [] for event in room_list: - d = event.get_dict() + d = { + "room_id": event.room_id, + "membership": event.membership, + } ret.append(d) if event.membership != Membership.JOIN: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 46b9dbcbbf..750e86040e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,8 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - yield self._store_event(event) + ret = yield self._store_event(event) + defer.returnValue(ret) @defer.inlineCallbacks def get_event(self, event_id): @@ -114,6 +115,9 @@ class DataStore(RoomMemberStore, RoomStore, } ) + latest = yield self.get_room_events_max_id() + defer.returnValue(latest) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c5c3770a40..1300aee8b0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ class StreamStore(SQLBaseStore): ) invites_sql = ( - "SELECT m.event_id FROM room_membershipas as m " + "SELECT m.event_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -55,8 +55,9 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events as e WHERE " - "(room_id IN (%(current)s)) OR " - "(event_id IN (%(invites)s)) " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + " AND e.ordering > ? AND e.ordering < ? " "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, @@ -66,10 +67,17 @@ class StreamStore(SQLBaseStore): rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE + user_id, user_id, Membership.INVITE, from_key, to_key ) - defer.returnValue([self._parse_event_from_row(r) for r in rows]) + ret = [self._parse_event_from_row(r) for r in rows] + + if ret: + max_id = max([r["ordering"] for r in rows]) + else: + max_id = to_key + + defer.returnValue((ret, max_id)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): -- cgit 1.4.1 From 8d1f7632095b949d7726dd72fce10224764f3c11 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 15:53:06 +0100 Subject: Fix pagination to work with new db schema --- synapse/handlers/room.py | 48 +++++++++++++++++++++++------------------------ synapse/storage/stream.py | 31 +++++++++++++++++++++++------- 2 files changed, 48 insertions(+), 31 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9261984b7e..b0b2441b9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -97,30 +97,30 @@ class MessageHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) yield self.hs.get_federation().handle_new_event(event) -# -# @defer.inlineCallbacks -# def get_messages(self, user_id=None, room_id=None, pagin_config=None, -# feedback=False): -# """Get messages in a room. -# -# Args: -# user_id (str): The user requesting messages. -# room_id (str): The room they want messages from. -# pagin_config (synapse.api.streams.PaginationConfig): The pagination -# config rules to apply, if any. -# feedback (bool): True to get compressed feedback with the messages -# Returns: -# dict: Pagination API results -# """ -# yield self.auth.check_joined_room(room_id, user_id) -# -# data_source = [MessagesStreamData(self.hs, room_id=room_id, -# feedback=feedback)] -# event_stream = EventStream(user_id, data_source) -# pagin_config = yield event_stream.fix_tokens(pagin_config) -# data_chunk = yield event_stream.get_chunk(config=pagin_config) -# defer.returnValue(data_chunk) -# + + @defer.inlineCallbacks + def get_messages(self, user_id=None, room_id=None, pagin_config=None, + feedback=False): + """Get messages in a room. + + Args: + user_id (str): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + feedback (bool): True to get compressed feedback with the messages + Returns: + dict: Pagination API results + """ + yield self.auth.check_joined_room(room_id, user_id) + + data_source = [EventsStreamData(self.hs, room_id=room_id, + feedback=feedback)] + event_stream = EventStream(user_id, data_source) + pagin_config = yield event_stream.fix_tokens(pagin_config) + data_chunk = yield event_stream.get_chunk(config=pagin_config) + defer.returnValue(data_chunk) + @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): """ Stores data for a room. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1300aee8b0..6bfa00d59a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -53,18 +53,35 @@ class StreamStore(SQLBaseStore): else: limit = 1000 + # From and to keys should be integers from ordering. + from_key = int(from_key) + to_key = int(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + sql = ( "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " - " AND e.ordering > ? AND e.ordering < ? " - "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, "invites": invites_sql, - "limit": limit, } + if from_key < to_key: + sql += ( + "AND e.ordering > ? AND e.ordering < ? " + "ORDER BY ordering ASC LIMIT %(limit)d " + ) % {"limit": limit} + else: + sql += ( + "AND e.ordering < ? AND e.ordering > ? " + "ORDER BY ordering DESC LIMIT %(limit)d " + ) % {"limit": int(limit)} + rows = yield self._execute_and_decode( sql, user_id, user_id, Membership.INVITE, from_key, to_key @@ -72,12 +89,12 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if ret: - max_id = max([r["ordering"] for r in rows]) + if from_key < to_key: + key = max([r["ordering"] for r in rows]) else: - max_id = to_key + key = min([r["ordering"] for r in rows]) - defer.returnValue((ret, max_id)) + defer.returnValue((ret, key)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): -- cgit 1.4.1 From d260a42ca279fbca46f85b2c96bddc4f814ecef3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:17:36 +0100 Subject: PEP8 cleanups --- synapse/api/events/room.py | 1 + synapse/handlers/room.py | 17 ++++++++++------- synapse/storage/__init__.py | 6 +++++- synapse/storage/_base.py | 3 ++- synapse/storage/room.py | 15 +++++++++++---- synapse/storage/roommember.py | 1 - synapse/storage/stream.py | 2 -- 7 files changed, 29 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index d9b3d572fa..dbf537fb88 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -20,6 +20,7 @@ class GenericEvent(SynapseEvent): def get_content_template(self): return {} + class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b0b2441b9f..4ecfb278be 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -114,8 +114,9 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [EventsStreamData(self.hs, room_id=room_id, - feedback=feedback)] + data_source = [ + EventsStreamData(self.hs, room_id=room_id, feedback=feedback) + ] event_stream = EventStream(user_id, data_source) pagin_config = yield event_stream.fix_tokens(pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config) @@ -196,7 +197,9 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state(room_id, event_type, state_key) + data = yield self.store.get_current_state( + room_id, event_type, state_key + ) defer.returnValue(data) @defer.inlineCallbacks @@ -496,7 +499,7 @@ class RoomMemberHandler(BaseHandler): SynapseError if there was a problem changing the membership. """ - #broadcast_msg = False + # broadcast_msg = False prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id @@ -570,7 +573,8 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True, + broadcast_msg=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -621,7 +625,6 @@ class RoomMemberHandler(BaseHandler): broadcast_msg=broadcast_msg, ) - if should_do_dance: yield self._do_invite_join_dance( room_id=room_id, @@ -755,7 +758,7 @@ class RoomMemberHandler(BaseHandler): room_id, "", is_public=False ) - #yield self.state_handler.handle_new_event(event) + # yield self.state_handler.handle_new_event(event) yield federation.handle_new_event(new_event) yield federation.get_state_for_room( target_host, room_id diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 750e86040e..ad36d14a36 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -87,7 +87,11 @@ class DataStore(RoomMemberStore, RoomStore, "content": json.dumps(event.content), } - unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + unrec = { + k: v + for k, v in event.get_full_dict().items() + if k not in vals.keys() + } vals["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 413838f798..36cc57c1b8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,7 +60,8 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None + "[SQL] %s Args=%s Func=%s", + query, args, decoder.__name__ if decoder else None ) def interaction(txn): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8f44b67d8c..22f2dcca45 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -87,19 +87,26 @@ class RoomStore(SQLBaseStore): """ topic_subquery = ( - "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "SELECT topics.event_id as event_id, " + "topics.room_id as room_id, topic " + "FROM topics " "INNER JOIN current_state_events as c " "ON c.event_id = topics.event_id " ) name_subquery = ( - "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "SELECT room_names.event_id as event_id, " + "room_names.room_id as room_id, name " + "FROM room_names " "INNER JOIN current_state_events as c " "ON c.event_id = room_names.event_id " ) + # We use non printing ascii character US () as a seperator sql = ( - "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "SELECT r.room_id, n.name, t.topic, " + "group_concat(a.room_alias, '') " + "FROM rooms AS r " "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " @@ -117,7 +124,7 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(","), + "aliases": r[3].split(""), } for r in rows ] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index a0620677b9..89c87290cf 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -62,7 +62,6 @@ class RoomMemberStore(SQLBaseStore): yield self._execute(None, sql, event.room_id, domain) - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7c2c45e0ff..cf4b1682b6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -61,7 +61,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], to_key)) return - sql = ( "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " @@ -90,7 +89,6 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if rows: if from_key < to_key: key = max([r["ordering"] for r in rows]) -- cgit 1.4.1 From 6efc688917cac03f91c7337015741f2c6e82e0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:47:09 +0100 Subject: Fix typo of key name --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4ecfb278be..14ffddc630 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -603,7 +603,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.INVITE: room = yield self.store.get_room(room_id) inviter = UserID.from_string( - prev_state.sender, self.hs + prev_state.user_id, self.hs ) should_do_dance = not inviter.is_mine and not room -- cgit 1.4.1 From dccb2f57be566c4cbf8cc413c78eed79036d7049 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 10:59:04 +0100 Subject: Disable the ability to GET individualy messages. We need to think about the correct API to do this, as the current one doesn't make much sense. --- synapse/handlers/room.py | 12 ++++---- tests/rest/test_rooms.py | 72 ++++++++++++++++++++++++------------------------ 2 files changed, 43 insertions(+), 41 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 14ffddc630..cdc98d2b08 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -59,12 +59,14 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # Pull out the message from the db - msg = yield self.store.get_message(room_id=room_id, - msg_id=msg_id, - user_id=sender_id) +# msg = yield self.store.get_message( +# room_id=room_id, +# msg_id=msg_id, +# user_id=sender_id +# ) + + # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this. - if msg: - defer.returnValue(msg) defer.returnValue(None) @defer.inlineCallbacks diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index 86025103db..3ac2bbdd0f 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -104,36 +104,36 @@ class RoomPermissionsTestCase(RestTestCase): def tearDown(self): pass - @defer.inlineCallbacks - def test_get_message(self): - # get message in uncreated room, expect 403 - (code, response) = yield self.mock_server.trigger_get( - "/rooms/noroom/messages/someid/m1") - self.assertEquals(403, code, msg=str(response)) - - # get message in created room not joined (no state), expect 403 - (code, response) = yield self.mock_server.trigger_get( - self.created_rmid_msg_path) - self.assertEquals(403, code, msg=str(response)) - - # get message in created room and invited, expect 403 - yield self.invite(room=self.created_rmid, src=self.rmcreator_id, - targ=self.user_id) - (code, response) = yield self.mock_server.trigger_get( - self.created_rmid_msg_path) - self.assertEquals(403, code, msg=str(response)) - - # get message in created room and joined, expect 200 - yield self.join(room=self.created_rmid, user=self.user_id) - (code, response) = yield self.mock_server.trigger_get( - self.created_rmid_msg_path) - self.assertEquals(200, code, msg=str(response)) - - # get message in created room and left, expect 403 - yield self.leave(room=self.created_rmid, user=self.user_id) - (code, response) = yield self.mock_server.trigger_get( - self.created_rmid_msg_path) - self.assertEquals(403, code, msg=str(response)) +# @defer.inlineCallbacks +# def test_get_message(self): +# # get message in uncreated room, expect 403 +# (code, response) = yield self.mock_server.trigger_get( +# "/rooms/noroom/messages/someid/m1") +# self.assertEquals(403, code, msg=str(response)) +# +# # get message in created room not joined (no state), expect 403 +# (code, response) = yield self.mock_server.trigger_get( +# self.created_rmid_msg_path) +# self.assertEquals(403, code, msg=str(response)) +# +# # get message in created room and invited, expect 403 +# yield self.invite(room=self.created_rmid, src=self.rmcreator_id, +# targ=self.user_id) +# (code, response) = yield self.mock_server.trigger_get( +# self.created_rmid_msg_path) +# self.assertEquals(403, code, msg=str(response)) +# +# # get message in created room and joined, expect 200 +# yield self.join(room=self.created_rmid, user=self.user_id) +# (code, response) = yield self.mock_server.trigger_get( +# self.created_rmid_msg_path) +# self.assertEquals(200, code, msg=str(response)) +# +# # get message in created room and left, expect 403 +# yield self.leave(room=self.created_rmid, user=self.user_id) +# (code, response) = yield self.mock_server.trigger_get( +# self.created_rmid_msg_path) +# self.assertEquals(403, code, msg=str(response)) @defer.inlineCallbacks def test_send_message(self): @@ -913,9 +913,9 @@ class RoomMessagesTestCase(RestTestCase): (code, response) = yield self.mock_server.trigger("PUT", path, content) self.assertEquals(200, code, msg=str(response)) - (code, response) = yield self.mock_server.trigger("GET", path, None) - self.assertEquals(200, code, msg=str(response)) - self.assert_dict(json.loads(content), response) + # (code, response) = yield self.mock_server.trigger("GET", path, None) + # self.assertEquals(200, code, msg=str(response)) + # self.assert_dict(json.loads(content), response) # m.text message type path = "/rooms/%s/messages/%s/mid2" % ( @@ -925,9 +925,9 @@ class RoomMessagesTestCase(RestTestCase): (code, response) = yield self.mock_server.trigger("PUT", path, content) self.assertEquals(200, code, msg=str(response)) - (code, response) = yield self.mock_server.trigger("GET", path, None) - self.assertEquals(200, code, msg=str(response)) - self.assert_dict(json.loads(content), response) + # (code, response) = yield self.mock_server.trigger("GET", path, None) + # self.assertEquals(200, code, msg=str(response)) + # self.assert_dict(json.loads(content), response) # trying to send message in different user path path = "/rooms/%s/messages/%s/mid2" % ( -- cgit 1.4.1 From 4eb8f84aa8fc735e228f66d11c355625bb14c1cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 16:20:21 +0100 Subject: Make snapshot_all_rooms return results in the correct form, including start and end tokens. --- synapse/handlers/room.py | 9 +++++++-- synapse/storage/stream.py | 25 +++++++++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cdc98d2b08..9e10235fa0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -269,11 +269,16 @@ class MessageHandler(BaseHandler): if event.membership != Membership.JOIN: continue try: - messages = yield self.store.get_recent_events_for_room( + messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=50, ) - d["messages"] = [m.get_dict() for m in messages] + + d["messages"] = { + "chunk": [m.get_dict() for m in messages], + "start": token[0], + "end": token[1], + } except: pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f7968f576f..6728a4b5ea 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -104,19 +104,36 @@ class StreamStore(SQLBaseStore): def get_recent_events_for_room(self, room_id, limit, with_feedback=False): # TODO (erikj): Handle compressed feedback + end_token = yield self.get_room_events_max_id() + sql = ( - "SELECT * FROM events WHERE room_id = ? " - "ORDER BY token_ordering, rowid DESC LIMIT ? " + "SELECT * FROM events WHERE " + "WHERE room_id = ? AND token_ordering <= ? " + "ORDER BY topological_ordering, rowid DESC LIMIT ? " ) rows = yield self._execute_and_decode( sql, - room_id, limit + room_id, end_token, limit ) rows.reverse() # As we selected with reverse ordering - defer.returnValue([self._parse_event_from_row(r) for r in rows]) + if rows: + topo = rows[0]["topological_ordering"] + row_id = rows[0]["rowid"] + start_token = "p%s-%s" % (topo, row_id) + + token = (start_token, end_token) + else: + token = ("START", end_token) + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + token + ) + ) @defer.inlineCallbacks def get_room_events_max_id(self): -- cgit 1.4.1 From 1422a22970e4ab3f7a37fe672af2f1d1de901c10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 16:25:18 +0100 Subject: Fix typos in SQL and where we still had rowid's (which no longer exist) --- synapse/handlers/room.py | 2 +- synapse/storage/stream.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9e10235fa0..e5dd2e245f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -280,7 +280,7 @@ class MessageHandler(BaseHandler): "end": token[1], } except: - pass + logger.exception("Failed to get snapshot") logger.debug("snapshot_all_rooms returning: %s", ret) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6728a4b5ea..c03c983e14 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -74,13 +74,13 @@ class StreamStore(SQLBaseStore): if from_key < to_key: sql += ( "AND e.token_ordering > ? AND e.token_ordering < ? " - "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d " + "ORDER BY token_ordering ASC LIMIT %(limit)d " ) % {"limit": limit} else: sql += ( "AND e.token_ordering < ? " "AND e.token_ordering > ? " - "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d " + "ORDER BY e.token_ordering DESC LIMIT %(limit)d " ) % {"limit": int(limit)} rows = yield self._execute_and_decode( @@ -107,9 +107,9 @@ class StreamStore(SQLBaseStore): end_token = yield self.get_room_events_max_id() sql = ( - "SELECT * FROM events WHERE " + "SELECT * FROM events " "WHERE room_id = ? AND token_ordering <= ? " - "ORDER BY topological_ordering, rowid DESC LIMIT ? " + "ORDER BY topological_ordering, token_ordering DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -121,8 +121,8 @@ class StreamStore(SQLBaseStore): if rows: topo = rows[0]["topological_ordering"] - row_id = rows[0]["rowid"] - start_token = "p%s-%s" % (topo, row_id) + toke = rows[0]["token_ordering"] + start_token = "p%s-%s" % (topo, toke) token = (start_token, end_token) else: -- cgit 1.4.1 From 598a1d8ff953c70f9f54564225d693a1bcf42144 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:19:48 +0100 Subject: Change the way pagination works to support out of order events. --- synapse/api/streams/__init__.py | 19 ++- synapse/api/streams/event.py | 92 +++++----- synapse/handlers/presence.py | 2 +- synapse/handlers/room.py | 3 +- synapse/storage/schema/im.sql | 2 +- synapse/storage/stream.py | 186 +++++++++++++++++---- .../components/matrix/event-stream-service.js | 1 - webclient/components/matrix/matrix-service.js | 4 +- 8 files changed, 227 insertions(+), 82 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py index 989e63f9ec..44f4cc6078 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/api/streams/__init__.py @@ -20,23 +20,23 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" - def __init__(self, from_tok=None, to_tok=None, limit=0): + def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): self.from_tok = from_tok self.to_tok = to_tok + self.direction = direction self.limit = limit @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": PaginationStream.TOK_START, - "to_tok": PaginationStream.TOK_END, - "limit": 0 + "direction": 'f', } query_param_mappings = [ # 3-tuple of qp_key, attribute, rules ("from", "from_tok", lambda x: type(x) == str), ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()) + ("limit", "limit", lambda x: x.isdigit()), + ("dir", "direction", lambda x: x == 'f' or x == 'b'), ] for qp, attr, is_valid in query_param_mappings: @@ -48,12 +48,17 @@ class PaginationConfig(object): return PaginationConfig(**params) + def __str__(self): + return ( + "" + ) % (self.from_tok, self.to_tok, self.direction, self.limit) + class PaginationStream(object): """ An interface for streaming data as chunks. """ - TOK_START = "START" TOK_END = "END" def get_chunk(self, config=None): @@ -76,7 +81,7 @@ class StreamData(object): self.hs = hs self.store = hs.get_datastore() - def get_rows(self, user_id, from_pkey, to_pkey, limit): + def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): """ Get event stream data between the specified pkeys. Args: diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 414b05be30..a5c8b2b31f 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -38,8 +38,8 @@ class EventsStreamData(StreamData): self.with_feedback = feedback @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - data, latest_ver = yield self.store.get_room_events_stream( + def get_rows(self, user_id, from_key, to_key, limit, direction): + data, latest_ver = yield self.store.get_room_events( user_id=user_id, from_key=from_key, to_key=to_key, @@ -70,6 +70,15 @@ class EventStream(PaginationStream): pagination_config.from_tok) pagination_config.to_tok = yield self.fix_token( pagination_config.to_tok) + + if ( + not pagination_config.to_tok + and pagination_config.direction == 'f' + ): + pagination_config.to_tok = yield self.get_current_max_token() + + logger.debug("pagination_config: %s", pagination_config) + defer.returnValue(pagination_config) @defer.inlineCallbacks @@ -81,39 +90,42 @@ class EventStream(PaginationStream): Returns: The fixed-up token, which may == token. """ - # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. - replacements = [ - (PaginationStream.TOK_START, "0"), - (PaginationStream.TOK_END, "-1") - ] - for magic_token, key in replacements: - if magic_token == token: - token = EventStream.SEPARATOR.join( - [key] * len(self.stream_data) - ) - - # replace -1 values with an actual pkey - token_segments = self._split_token(token) - for i, tok in enumerate(token_segments): - if tok == -1: - # add 1 to the max token because results are EXCLUSIVE from the - # latest version. - token_segments[i] = 1 + (yield self.stream_data[i].max_token()) - defer.returnValue(EventStream.SEPARATOR.join( - str(x) for x in token_segments - )) + if token == PaginationStream.TOK_END: + new_token = yield self.get_current_max_token() + + logger.debug("fix_token: From %s to %s", token, new_token) + + token = new_token + + defer.returnValue(token) @defer.inlineCallbacks - def get_chunk(self, config=None): + def get_current_max_token(self): + new_token_parts = [] + for s in self.stream_data: + mx = yield s.max_token() + new_token_parts.append(str(mx)) + + new_token = EventStream.SEPARATOR.join(new_token_parts) + + logger.debug("get_current_max_token: %s", new_token) + + defer.returnValue(new_token) + + @defer.inlineCallbacks + def get_chunk(self, config): # no support for limit on >1 streams, makes no sense. if config.limit and len(self.stream_data) > 1: raise EventStreamError( 400, "Limit not supported on multiplexed streams." ) - (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, - config.to_tok, - config.limit) + chunk_data, next_tok = yield self._get_chunk_data( + config.from_tok, + config.to_tok, + config.limit, + config.direction, + ) defer.returnValue({ "chunk": chunk_data, @@ -122,7 +134,7 @@ class EventStream(PaginationStream): }) @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit): + def _get_chunk_data(self, from_tok, to_tok, limit, direction): """ Get event data between the two tokens. Tokens are SEPARATOR separated values representing pkey values of @@ -140,11 +152,12 @@ class EventStream(PaginationStream): EventStreamError if something went wrong. """ # sanity check - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") + if to_tok is not None: + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") chunk = [] next_ver = [] @@ -158,7 +171,7 @@ class EventStream(PaginationStream): continue (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit + self.user_id, from_pkey, to_pkey, limit, direction, ) chunk.extend([ @@ -177,9 +190,8 @@ class EventStream(PaginationStream): Returns: A list of ints. """ - segments = token.split(EventStream.SEPARATOR) - try: - int_segments = [int(x) for x in segments] - except ValueError: - raise EventStreamError(400, "Bad token: %s" % token) - return int_segments + if token: + segments = token.split(EventStream.SEPARATOR) + else: + segments = [None] * len(self.stream_data) + return segments diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e8cb83eddb..f140dc527a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -684,7 +684,7 @@ class PresenceStreamData(StreamData): super(PresenceStreamData, self).__init__(hs) self.presence = hs.get_handlers().presence_handler - def get_rows(self, user_id, from_key, to_key, limit): + def get_rows(self, user_id, from_key, to_key, limit, direction): cachemap = self.presence._user_cachemap # TODO(paul): limit, and filter by visibility diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e5dd2e245f..40867ae2e0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -466,7 +466,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", + "start": "START", # FIXME (erikj): START is no longer a valid value "end": "END", "chunk": event_list } @@ -811,4 +811,5 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): chunk = yield self.store.get_rooms(is_public=True) + # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 0fb3dbee55..ea04261ff0 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS events( - token_ordering INTEGER PRIMARY KEY AUTOINCREMENT, + stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c03c983e14..87fc978813 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -16,8 +16,9 @@ from twisted.internet import defer from ._base import SQLBaseStore - +from synapse.api.errors import SynapseError from synapse.api.constants import Membership +from synapse.util.logutils import log_function import json import logging @@ -29,9 +30,96 @@ logger = logging.getLogger(__name__) MAX_STREAM_SIZE = 1000 +_STREAM_TOKEN = "stream" +_TOPOLOGICAL_TOKEN = "topological" + + +def _parse_stream_token(string): + try: + if string[0] != 's': + raise + return int(string[1:]) + except: + logger.debug("Not stream token: %s", string) + raise SynapseError(400, "Invalid token") + + +def _parse_topological_token(string): + try: + if string[0] != 't': + raise + parts = string[1:].split('-', 1) + return (int(parts[0]), int(parts[1])) + except: + logger.debug("Not topological token: %s", string) + raise SynapseError(400, "Invalid token") + + +def is_stream_token(string): + try: + _parse_stream_token(string) + return True + except: + return False + + +def is_topological_token(string): + try: + _parse_topological_token(string) + return True + except: + return False + + +def _get_token_bound(token, comparison): + try: + s = _parse_stream_token(token) + return "%s %s %d" % ("stream_ordering", comparison, s) + except: + pass + + try: + top, stream = _parse_topological_token(token) + return "%s %s %d AND %s %s %d" % ( + "topological_ordering", comparison, top, + "stream_ordering", comparison, stream, + ) + except: + pass + + raise SynapseError(400, "Invalid token") + + class StreamStore(SQLBaseStore): + @log_function + def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, + direction='f', with_feedback=False): + is_events = ( + direction == 'f' + and is_stream_token(from_key) + and to_key and is_stream_token(to_key) + ) + + if is_events: + return self.get_room_events_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, + ) + else: + return self.paginate_room_events( + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, + ) @defer.inlineCallbacks + @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): # TODO (erikj): Handle compressed feedback @@ -54,8 +142,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_key = int(from_key) - to_key = int(to_key) + from_id = _parse_stream_token(from_key) + to_id = _parse_stream_token(to_key) if from_key == to_key: defer.returnValue(([], to_key)) @@ -65,41 +153,78 @@ class StreamStore(SQLBaseStore): "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " + "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "current": current_room_membership_sql, "invites": invites_sql, + "limit": limit } - # Constraints and ordering depend on direction. - if from_key < to_key: - sql += ( - "AND e.token_ordering > ? AND e.token_ordering < ? " - "ORDER BY token_ordering ASC LIMIT %(limit)d " - ) % {"limit": limit} - else: - sql += ( - "AND e.token_ordering < ? " - "AND e.token_ordering > ? " - "ORDER BY e.token_ordering DESC LIMIT %(limit)d " - ) % {"limit": int(limit)} - rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE, from_key, to_key + user_id, user_id, Membership.INVITE, from_id, to_id ) ret = [self._parse_event_from_row(r) for r in rows] if rows: - if from_key < to_key: - key = max([r["token_ordering"] for r in rows]) - else: - key = min([r["token_ordering"] for r in rows]) + key = "s%d" % max([r["stream_ordering"] for r in rows]) else: + # Assume we didn't get anything because there was nothing to get. key = to_key defer.returnValue((ret, key)) + @defer.inlineCallbacks + @log_function + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, + with_feedback=False): + # TODO (erikj): Handle compressed feedback + + from_comp = '<' if direction =='b' else '>' + to_comp = '>' if direction =='b' else '<' + order = "DESC" if direction == 'b' else "ASC" + + args = [room_id] + + bounds = _get_token_bound(from_key, from_comp) + if to_key: + bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) + + if int(limit) > 0: + args.append(int(limit)) + limit_str = " LIMIT ?" + else: + limit_str = "" + + sql = ( + "SELECT * FROM events " + "WHERE room_id = ? AND %(bounds)s " + "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " + ) % {"bounds": bounds, "order": order, "limit": limit_str} + + rows = yield self._execute_and_decode( + sql, + *args + ) + + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + next_token = "t%s-%s" % (topo, toke) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + next_token + ) + ) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): # TODO (erikj): Handle compressed feedback @@ -108,8 +233,8 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events " - "WHERE room_id = ? AND token_ordering <= ? " - "ORDER BY topological_ordering, token_ordering DESC LIMIT ? " + "WHERE room_id = ? AND stream_ordering <= ? " + "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -121,12 +246,12 @@ class StreamStore(SQLBaseStore): if rows: topo = rows[0]["topological_ordering"] - toke = rows[0]["token_ordering"] + toke = rows[0]["stream_ordering"] start_token = "p%s-%s" % (topo, toke) token = (start_token, end_token) else: - token = ("START", end_token) + token = (end_token, end_token) defer.returnValue( ( @@ -138,11 +263,14 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_max_id(self): res = yield self._execute_and_decode( - "SELECT MAX(token_ordering) as m FROM events" + "SELECT MAX(stream_ordering) as m FROM events" ) - if not res: - defer.returnValue(0) + logger.debug("get_room_events_max_id: %s", res) + + if not res or not res[0] or not res[0]["m"]: + defer.returnValue("s1") return - defer.returnValue(res[0]["m"]) + key = res[0]["m"] + 1 + defer.returnValue("s%d" % (key,)) diff --git a/webclient/components/matrix/event-stream-service.js b/webclient/components/matrix/event-stream-service.js index c94cf0fe72..a446fad5d4 100644 --- a/webclient/components/matrix/event-stream-service.js +++ b/webclient/components/matrix/event-stream-service.js @@ -25,7 +25,6 @@ the eventHandlerService. angular.module('eventStreamService', []) .factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) { var END = "END"; - var START = "START"; var TIMEOUT_MS = 30000; var ERR_TIMEOUT_MS = 5000; diff --git a/webclient/components/matrix/matrix-service.js b/webclient/components/matrix/matrix-service.js index c52c94c310..3cd0aa674b 100644 --- a/webclient/components/matrix/matrix-service.js +++ b/webclient/components/matrix/matrix-service.js @@ -230,8 +230,8 @@ angular.module('matrixService', []) path = path.replace("$room_id", room_id); var params = { from: from_token, - to: "START", - limit: limit + limit: limit, + dir: 'b' }; return doRequest("GET", path, params); }, -- cgit 1.4.1 From 75b6d982a01a431a89d2ab76d91a09159630d059 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:20:03 +0100 Subject: Add a 'backfill room' button --- synapse/federation/handler.py | 22 +++++++++++++++------- synapse/federation/replication.py | 9 ++++++--- synapse/handlers/federation.py | 21 ++++++++++++++++++--- synapse/rest/room.py | 16 ++++++++++++++++ synapse/storage/__init__.py | 22 ++++++++++++++++++---- synapse/storage/pdu.py | 16 ++++++++-------- 6 files changed, 81 insertions(+), 25 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 68243d31d0..984c1558e9 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -74,10 +74,18 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def backfill(self, room_id, limit): - # TODO: Work out which destinations to ask for backfill - # self.replication_layer.backfill(dest, room_id, limit) - pass + def backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) @log_function def get_state_for_room(self, destination, room_id): @@ -87,7 +95,7 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu): + def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it throught the StateHandler. """ @@ -95,7 +103,7 @@ class FederationEventHandler(object): try: with (yield self.lock_manager.lock(pdu.context)): - if event.is_state: + if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( pdu ) @@ -104,7 +112,7 @@ class FederationEventHandler(object): else: is_new_state = False - yield self.event_handler.on_receive(event, is_new_state) + yield self.event_handler.on_receive(event, is_new_state, backfilled) except AuthError: # TODO: Implement something in federation that allows us to diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bc9df2f214..3e5f1a4108 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -208,7 +208,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(pdu, backfilled=True) defer.returnValue(pdus) @@ -415,7 +415,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu): + def _handle_new_pdu(self, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -451,7 +451,10 @@ class ReplicationLayer(object): # Persist the Pdu, but don't mark it as processed yet. yield self.pdu_actions.persist_received(pdu) - ret = yield self.handler.on_receive_pdu(pdu) + if not backfilled: + ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + else: + ret = None yield self.pdu_actions.mark_as_processed(pdu) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a2..ef9ed274df 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,7 +35,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +70,21 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) - yield self.notifier.on_new_room_event(event, store_id) + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.debug("Failed to persiste event: %s", event) + + defer.returnValue(events) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index dfb2aabe70..89ea9f0d25 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet): defer.returnValue((200, msgs)) +class RoomTriggerBackfill(RestServlet): + PATTERN = client_path_pattern("/rooms/(?P[^/]*)/backfill$") + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + remote_server = urllib.unquote(request.args["remote"][0]) + room_id = urllib.unquote(room_id) + limit = int(request.args["limit"][0]) + + handler = self.handlers.federation_handler + events = yield handler.backfill(remote_server, room_id, limit) + + res = [event.get_dict() for event in events] + defer.returnValue((200, res)) + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -403,3 +418,4 @@ def register_servlets(hs, http_server): RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) + RoomTriggerBackfill(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b846081d49..2243a710db 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,6 +20,8 @@ from synapse.api.events.room import ( RoomConfigEvent, RoomNameEvent, ) +from synapse.util.logutils import log_function + from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore from .transactions import TransactionStore import json +import logging import os +logger = logging.getLogger(__name__) + + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, @@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token = None @defer.inlineCallbacks + @log_function def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) @@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks + @log_function def _store_event(self, event, backfilled): # FIXME (erikj): This should be removed when we start amalgamating # event and pdu storage @@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore, if not self.min_token_deferred.called: yield self.min_token_deferred self.min_token -= 1 - vals["token_ordering"] = self.min_token + vals["stream_ordering"] = self.min_token unrec = { k: v @@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore, } vals["unrecognized_keys"] = json.dumps(unrec) - yield self._simple_insert("events", vals) + try: + yield self._simple_insert("events", vals) + except: + logger.exception("Failed to persist, probably duplicate") + return if not backfilled and hasattr(event, "state_key"): vals = { @@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore, def _get_min_token(self): row = yield self._execute( None, - "SELECT MIN(token_ordering) FROM events" + "SELECT MIN(stream_ordering) FROM events" ) - self.min_token = rows[0][0] if rows and rows[0] else 0 + self.min_token = min(row[0][0], -1) if row and row[0] else -1 + + logger.debug("min_token is: %s", self.min_token) defer.returnValue(self.min_token) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a24ce7ab78..7655f43ede 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table, JoinHelper from synapse.util.logutils import log_function @@ -319,6 +321,7 @@ class PduStore(SQLBaseStore): return [(row[0], row[1], row[2]) for row in results] + @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): """Get a list of Pdus that we haven't backfilled beyond yet (and haven't seen). This list is used when we want to backfill backwards and is the @@ -331,17 +334,14 @@ class PduStore(SQLBaseStore): Returns: list: A list of PduIdTuple. """ - return self._db_pool.runInteraction( - self._get_oldest_pdus_in_context, context - ) - - def _get_oldest_pdus_in_context(self, txn, context): - txn.execute( + results = yield self._execute( + None, "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" % {"back": PduBackwardExtremitiesTable.table_name, }, - (context,) + context ) - return [PduIdTuple(i, o) for i, o in txn.fetchall()] + + defer.returnValue([PduIdTuple(i, o) for i, o in results]) def is_pdu_new(self, pdu_id, origin, context, depth): """For a given Pdu, try and figure out if it's 'new', i.e., if it's -- cgit 1.4.1 From 234128586bd210a496bea7aef7045cd5905b8b5c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:30:28 +0100 Subject: Print out stacktrace when we failed to persist event. --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ef9ed274df..0430a8307e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -85,6 +85,6 @@ class FederationHandler(BaseHandler): try: yield self.store.persist_event(event, backfilled=True) except: - logger.debug("Failed to persiste event: %s", event) + logger.exception("Failed to persist event: %s", event) defer.returnValue(events) -- cgit 1.4.1 From 5c00614aaba881c354cb9eecf024aa3a84838c4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 15:51:10 +0100 Subject: PresenceStreamData was expecting *_key to be ints --- synapse/handlers/presence.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 60684f17d7..319e3c7c81 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -689,6 +689,9 @@ class PresenceStreamData(StreamData): self.presence = hs.get_handlers().presence_handler def get_rows(self, user_id, from_key, to_key, limit, direction): + from_key = int(from_key) + to_key = int(to_key) + cachemap = self.presence._user_cachemap # TODO(paul): limit, and filter by visibility -- cgit 1.4.1 From 849627b82e751071f80c96f62c9e59a2565cd85c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Aug 2014 11:50:16 +0100 Subject: Don't generate room membership messages. Include previous state of in membership messages. --- synapse/handlers/room.py | 17 ++++++++++------- tests/rest/test_rooms.py | 7 ++++++- 2 files changed, 16 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 40867ae2e0..7ab881847d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -506,18 +506,21 @@ class RoomMemberHandler(BaseHandler): SynapseError if there was a problem changing the membership. """ - # broadcast_msg = False + broadcast_msg = False prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id ) - if prev_state and prev_state.membership == event.membership: - # treat this event as a NOOP. - if do_auth: # This is mainly to fix a unit test. - yield self.auth.check(event, raises=True) - defer.returnValue({}) - return + if prev_state: + event.content["prev"] = prev_state.membership + +# if prev_state and prev_state.membership == event.membership: +# # treat this event as a NOOP. +# if do_auth: # This is mainly to fix a unit test. +# yield self.auth.check(event, raises=True) +# defer.returnValue({}) +# return room_id = event.room_id diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index e873181044..a9b66df912 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -794,7 +794,12 @@ class RoomMemberStateTestCase(RestTestCase): (code, response) = yield self.mock_resource.trigger("GET", path, None) self.assertEquals(200, code, msg=str(response)) - self.assertEquals(json.loads(content), response) + + expected_response = { + "membership": Membership.JOIN, + "prev": Membership.JOIN, + } + self.assertEquals(expected_response, response) @defer.inlineCallbacks def test_rooms_members_other(self): -- cgit 1.4.1 From 5ef0948eaa48d44822345efe04ec1612a96a4d37 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Aug 2014 14:42:36 +0100 Subject: Better handle the edge cases of trying to remote join rooms --- synapse/handlers/federation.py | 78 ++++++++++++++++++++++++++++++++++++++++++ synapse/handlers/room.py | 47 ++++++------------------- 2 files changed, 88 insertions(+), 37 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0430a8307e..aa3bf273f7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -72,6 +72,34 @@ class FederationHandler(BaseHandler): with (yield self.room_lock.lock(event.room_id)): store_id = yield self.store.persist_event(event, backfilled) + room = yield self.store.get_room(event.room_id) + + if not room: + # Huh, let's try and get the current state + try: + federation = self.hs.get_federation() + yield federation.get_state_for_room( + event.origin, event.room_id + ) + + hosts = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + if self.hs.hostname in hosts: + try: + yield self.store.store_room( + event.room_id, + "", + is_public=False + ) + except: + pass + except: + logger.exception( + "Failed to get current state for room %s", + event.room_id + ) + if not backfilled: yield self.notifier.on_new_room_event(event, store_id) @@ -88,3 +116,53 @@ class FederationHandler(BaseHandler): logger.exception("Failed to persist event: %s", event) defer.returnValue(events) + + @log_function + @defer.inlineCallbacks + def do_invite_join(self, target_host, room_id, joinee, content): + federation = self.hs.get_federation() + + hosts = yield self.store.get_joined_hosts_for_room(room_id) + if self.hs.hostname in hosts: + # We are already in the room. + logger.debug("We're already in the room apparently") + defer.returnValue(False) + + # First get current state to see if we are already joined. + try: + yield federation.get_state_for_room(target_host, room_id) + + hosts = yield self.store.get_joined_hosts_for_room(room_id) + if self.hs.hostname in hosts: + # Oh, we were actually in the room already. + logger.debug("We're already in the room apparently") + defer.returnValue(False) + except Exception: + logger.exception("Failed to get current state") + + new_event = self.event_factory.create_event( + etype=InviteJoinEvent.TYPE, + target_host=target_host, + room_id=room_id, + user_id=joinee, + content=content + ) + + new_event.destinations = [target_host] + + yield federation.handle_new_event(new_event) + + store_id = yield self.store.persist_event(new_event) + self.notifier.on_new_room_event(new_event, store_id) + + try: + yield self.store.store_room( + event.room_id, + "", + is_public=False + ) + except: + pass + + + defer.returnValue(True) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7ab881847d..6ecb6dd0e4 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -599,9 +599,9 @@ class RoomMemberHandler(BaseHandler): # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - room = yield self.store.get_room(room_id) + hosts = yield self.store.get_joined_hosts_for_room(room_id) - if room: + if self.hs.hostname in hosts: should_do_dance = False elif room_host: should_do_dance = True @@ -621,8 +621,15 @@ class RoomMemberHandler(BaseHandler): else: should_do_dance = False + have_joined = False + if should_do_dance: + handler = self.hs.get_handlers().federation_handler + have_joined = yield handler.do_invite_join( + room_host, room_id, event.user_id, event.content + ) + # We want to do the _do_update inside the room lock. - if not should_do_dance: + if not have_joined: logger.debug("Doing normal join") if do_auth: @@ -635,14 +642,6 @@ class RoomMemberHandler(BaseHandler): broadcast_msg=broadcast_msg, ) - if should_do_dance: - yield self._do_invite_join_dance( - room_id=room_id, - joinee=event.user_id, - target_host=room_host, - content=event.content, - ) - user = self.hs.parse_userid(event.user_id) self.distributor.fire( "user_joined_room", user=user, room_id=room_id @@ -748,32 +747,6 @@ class RoomMemberHandler(BaseHandler): membership=event.content["membership"] ) - @defer.inlineCallbacks - def _do_invite_join_dance(self, room_id, joinee, target_host, content): - logger.debug("Doing remote join dance") - - # do invite join dance - federation = self.hs.get_federation() - new_event = self.event_factory.create_event( - etype=InviteJoinEvent.TYPE, - target_host=target_host, - room_id=room_id, - user_id=joinee, - content=content - ) - - new_event.destinations = [target_host] - - yield self.store.store_room( - room_id, "", is_public=False - ) - - # yield self.state_handler.handle_new_event(event) - yield federation.handle_new_event(new_event) - yield federation.get_state_for_room( - target_host, room_id - ) - @defer.inlineCallbacks def _inject_membership_msg(self, room_id=None, source=None, target=None, membership=None): -- cgit 1.4.1 From 9c0e5704963f232a14545f26a4501b672a32beb4 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 20 Aug 2014 14:58:45 +0100 Subject: Kill the "_homeserver_" injected messages for room membership changes --- synapse/handlers/room.py | 60 ++++-------------------------------------------- synapse/rest/room.py | 4 ++-- 2 files changed, 6 insertions(+), 58 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6ecb6dd0e4..6bdba3f5e1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -383,7 +383,6 @@ class RoomCreationHandler(BaseHandler): yield self.hs.get_handlers().room_member_handler.change_membership( join_event, - broadcast_msg=True, do_auth=False ) @@ -495,19 +494,15 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(member) @defer.inlineCallbacks - def change_membership(self, event=None, broadcast_msg=False, do_auth=True): + def change_membership(self, event=None, do_auth=True): """ Change the membership status of a user in a room. Args: event (SynapseEvent): The membership event - broadcast_msg (bool): True to inject a membership message into this - room on success. Raises: SynapseError if there was a problem changing the membership. """ - broadcast_msg = False - prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id ) @@ -528,9 +523,7 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: - yield self._do_join( - event, do_auth=do_auth, broadcast_msg=broadcast_msg - ) + yield self._do_join(event, do_auth=do_auth) else: # This is not a JOIN, so we can handle it normally. if do_auth: @@ -548,7 +541,6 @@ class RoomMemberHandler(BaseHandler): yield self._do_local_membership_update( event, membership=event.content["membership"], - broadcast_msg=broadcast_msg, ) defer.returnValue({"room_id": room_id}) @@ -583,8 +575,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, - broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -639,7 +630,6 @@ class RoomMemberHandler(BaseHandler): yield self._do_local_membership_update( event, membership=event.content["membership"], - broadcast_msg=broadcast_msg, ) user = self.hs.parse_userid(event.user_id) @@ -710,7 +700,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks - def _do_local_membership_update(self, event, membership, broadcast_msg): + def _do_local_membership_update(self, event, membership): # store membership store_id = yield self.store.persist_event(event) @@ -739,48 +729,6 @@ class RoomMemberHandler(BaseHandler): yield self.hs.get_federation().handle_new_event(event) self.notifier.on_new_room_event(event, store_id) - if broadcast_msg: - yield self._inject_membership_msg( - source=event.user_id, - target=event.target_user_id, - room_id=event.room_id, - membership=event.content["membership"] - ) - - @defer.inlineCallbacks - def _inject_membership_msg(self, room_id=None, source=None, target=None, - membership=None): - # TODO this should be a different type of message, not m.text - if membership == Membership.INVITE: - body = "%s invited %s to the room." % (source, target) - elif membership == Membership.JOIN: - body = "%s joined the room." % (target) - elif membership == Membership.LEAVE: - body = "%s left the room." % (target) - else: - raise RoomError(500, "Unknown membership value %s" % membership) - - membership_json = { - "msgtype": u"m.text", - "body": body, - "membership_source": source, - "membership_target": target, - "membership": membership, - } - - msg_id = "m%s" % int(self.clock.time_msec()) - - event = self.event_factory.create_event( - etype=MessageEvent.TYPE, - room_id=room_id, - user_id="_homeserver_", - msg_id=msg_id, - content=membership_json - ) - - handler = self.hs.get_handlers().message_handler - yield handler.send_message(event, suppress_auth=True) - class RoomListHandler(BaseHandler): diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 1c48e63628..f5b547b963 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -193,7 +193,7 @@ class RoomMemberRestServlet(RestServlet): ) handler = self.handlers.room_member_handler - yield handler.change_membership(event, broadcast_msg=True) + yield handler.change_membership(event) defer.returnValue((200, "")) @defer.inlineCallbacks @@ -220,7 +220,7 @@ class RoomMemberRestServlet(RestServlet): ) handler = self.handlers.room_member_handler - yield handler.change_membership(event, broadcast_msg=True) + yield handler.change_membership(event) defer.returnValue((200, "")) -- cgit 1.4.1 From 50718825bd8d0ecc7ca8e700d2187360857ac8df Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 20 Aug 2014 15:50:37 +0100 Subject: Fix exception name in _fill_out_join_content() exception --- synapse/handlers/room.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6bdba3f5e1..4c297dbe33 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -640,6 +640,8 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _fill_out_join_content(self, user_id, content): # If event doesn't include a display name, add one. + # TODO(paul): This really ought to use the distributor's + # collect_presencelike_data signal instead. profile_handler = self.hs.get_handlers().profile_handler if "displayname" not in content: try: @@ -661,7 +663,7 @@ class RoomMemberHandler(BaseHandler): if avatar_url: content["avatar_url"] = avatar_url except: - logger.exception("Failed to set display_name") + logger.exception("Failed to set avatar_url") @defer.inlineCallbacks def _should_invite_join(self, room_id, prev_state, do_auth): -- cgit 1.4.1 From 583add34fe6908f642a78be9d08a15e0b47498d0 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 20 Aug 2014 16:04:01 +0100 Subject: Use the "collect_presencelike_data" distributor signal instead of re-implementing its behaviour --- synapse/handlers/room.py | 32 ++------------------------------ tests/handlers/test_room.py | 2 ++ 2 files changed, 4 insertions(+), 30 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4c297dbe33..6229ee9bfa 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -581,8 +581,8 @@ class RoomMemberHandler(BaseHandler): room_id = event.room_id # If event doesn't include a display name, add one. - yield self._fill_out_join_content( - joinee, event.content + yield self.distributor.fire( + "collect_presencelike_data", joinee, event.content ) # XXX: We don't do an auth check if we are doing an invite @@ -637,34 +637,6 @@ class RoomMemberHandler(BaseHandler): "user_joined_room", user=user, room_id=room_id ) - @defer.inlineCallbacks - def _fill_out_join_content(self, user_id, content): - # If event doesn't include a display name, add one. - # TODO(paul): This really ought to use the distributor's - # collect_presencelike_data signal instead. - profile_handler = self.hs.get_handlers().profile_handler - if "displayname" not in content: - try: - display_name = yield profile_handler.get_displayname( - user_id - ) - - if display_name: - content["displayname"] = display_name - except: - logger.exception("Failed to set display_name") - - if "avatar_url" not in content: - try: - avatar_url = yield profile_handler.get_avatar_url( - user_id - ) - - if avatar_url: - content["avatar_url"] = avatar_url - except: - logger.exception("Failed to set avatar_url") - @defer.inlineCallbacks def _should_invite_join(self, room_id, prev_state, do_auth): logger.debug("_should_invite_join: room_id: %s", room_id) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index be68f17696..bf71d3be3b 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -69,6 +69,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.distributor = hs.get_distributor() self.hs = hs + self.distributor.declare("collect_presencelike_data") + self.handlers.room_member_handler = RoomMemberHandler(self.hs) self.handlers.profile_handler = ProfileHandler(self.hs) self.room_member_handler = self.handlers.room_member_handler -- cgit 1.4.1 From 1587ea26fef65157f2a35b150f01bd8035e5e785 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 14:38:22 +0100 Subject: Wait for getting a Join in response to an invite/join dance. --- synapse/handlers/_base.py | 1 + synapse/handlers/federation.py | 29 +++++++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index c2f4685c92..3f07b5aa4a 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -24,4 +24,5 @@ class BaseHandler(object): self.notifier = hs.get_notifier() self.room_lock = hs.get_room_lock_manager() self.state_handler = hs.get_state_handler() + self.distributor = hs.get_distributor() self.hs = hs diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index aa3bf273f7..9cff444779 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,6 +32,15 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): """Handles events that originated from federation.""" + def __init__(self, hs): + super(FederationHandler, self).__init__(hs) + + self.distributor.observe( + "user_joined_room", + self._on_user_joined + ) + + self.waiting_for_join_list = {} @log_function @defer.inlineCallbacks @@ -103,6 +112,13 @@ class FederationHandler(BaseHandler): if not backfilled: yield self.notifier.on_new_room_event(event, store_id) + if event.type == RoomMemberEvent.TYPE: + if event.membership == Membership.JOIN: + user = self.hs.parse_userid(event.target_user_id) + self.distributor.fire( + "user_joined_room", user=user, room_id=event.room_id + ) + @log_function @defer.inlineCallbacks @@ -152,8 +168,10 @@ class FederationHandler(BaseHandler): yield federation.handle_new_event(new_event) - store_id = yield self.store.persist_event(new_event) - self.notifier.on_new_room_event(new_event, store_id) + # TODO (erikj): Time out here. + d = defer.Deferred() + self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) + yield d try: yield self.store.store_room( @@ -166,3 +184,10 @@ class FederationHandler(BaseHandler): defer.returnValue(True) + + + @log_function + def _on_user_joined(self, user, room_id): + waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) + while waiters: + waiters.pop().callback(None) -- cgit 1.4.1 From c6950b18cca665f6afe8ac00fcfa2322d8b35544 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 15:06:22 +0100 Subject: Return the current state in the initial sync api. --- synapse/handlers/room.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6229ee9bfa..91415afbba 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -279,6 +279,9 @@ class MessageHandler(BaseHandler): "start": token[0], "end": token[1], } + + current_state = yield self.store.get_current_state(event.room_id) + d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") -- cgit 1.4.1 From 3d1cae0e7954085bdc1dd1fca6a4ea4986e3d6f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 15:07:08 +0100 Subject: In the initial sync api, return the inviter for rooms in the 'invited' state --- synapse/handlers/room.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 91415afbba..d9809bd6de 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -264,6 +264,10 @@ class MessageHandler(BaseHandler): "room_id": event.room_id, "membership": event.membership, } + + if event.membership == Membership.INVITE: + d["inviter"] = event.user_id + ret.append(d) if event.membership != Membership.JOIN: -- cgit 1.4.1 From e7ee0b9fc113b1fd29b8cb96eea7a00641e56887 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 16:40:21 +0100 Subject: Change IM sync api to also return the current presence list. --- synapse/handlers/room.py | 24 +++++++++++++++++++++--- synapse/storage/stream.py | 5 ++--- 2 files changed, 23 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d9809bd6de..899b653fb7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomConfigEvent ) from synapse.api.streams.event import EventStream, EventsStreamData +from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -257,7 +258,19 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - ret = [] + rooms_ret = [] + + now_rooms_token = yield self.store.get_room_events_max_id() + + # FIXME (erikj): Fix this. + presence_stream = PresenceStreamData(self.hs) + now_presence_token = yield presence_stream.max_token() + presence = yield presence_stream.get_rows( + user_id, 0, now_presence_token, None, None + ) + + # FIXME (erikj): We need to not generate this token, + now_token = "%s_%s" % (now_rooms_token, now_presence_token) for event in room_list: d = { @@ -268,14 +281,15 @@ class MessageHandler(BaseHandler): if event.membership == Membership.INVITE: d["inviter"] = event.user_id - ret.append(d) + rooms_ret.append(d) if event.membership != Membership.JOIN: continue try: messages, token = yield self.store.get_recent_events_for_room( event.room_id, - limit=50, + limit=10, + end_token=now_rooms_token, ) d["messages"] = { @@ -289,6 +303,10 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") + user = self.hs.parse_userid(user_id) + + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} + logger.debug("snapshot_all_rooms returning: %s", ret) defer.returnValue(ret) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e994017bf2..8bc502483a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -249,11 +249,10 @@ class StreamStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + def get_recent_events_for_room(self, room_id, limit, end_token, + with_feedback=False): # TODO (erikj): Handle compressed feedback - end_token = yield self.get_room_events_max_id() - sql = ( "SELECT * FROM events " "WHERE room_id = ? AND stream_ordering <= ? " -- cgit 1.4.1