From 336987bb8debec9dcdbe27d59ce3889b39f86dbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Aug 2014 16:27:14 +0100 Subject: Initial stab at refactoring the SQL tables, including rejigging some of the storage layer. --- synapse/storage/__init__.py | 113 ++++++++++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 40 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3c27428c08..4fcef45e93 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -46,50 +46,83 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, self.event_factory = hs.get_event_factory() self.hs = hs + @defer.inlineCallbacks def persist_event(self, event): - if event.type == MessageEvent.TYPE: - return self.store_message( - user_id=event.user_id, - room_id=event.room_id, - msg_id=event.msg_id, - content=json.dumps(event.content) - ) - elif event.type == RoomMemberEvent.TYPE: - return self.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=event.content["membership"] - ) + if event.type == RoomMemberEvent.TYPE: + yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: - return self.store_feedback( - room_id=event.room_id, - msg_id=event.msg_id, - msg_sender_id=event.msg_sender_id, - fb_sender_id=event.user_id, - fb_type=event.feedback_type, - content=json.dumps(event.content) - ) - elif event.type == RoomTopicEvent.TYPE: - return self.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: - if "visibility" in event.content: - visibility = event.content["visibility"] - return self.store_room_config( - room_id=event.room_id, - visibility=visibility - ) - + yield self._store_room_config(event) + + self._store_event(event) + + @defer.inlineCallbacks + def get_event(self, event_id): + events_dict = yield self._simple_select_one( + "events", + {"event_id": event_id}, + [ + "event_id", + "type", + "sender", + "room_id", + "content", + "unrecognized_keys" + ], + ) + + event = self._parse_event_from_row(events_dict) + defer.returnValue(event) + + @defer.inlineCallbacks + def _store_event(self, event): + vals = { + "event_id": event.event_id, + "event_type", event.type, + "sender": event.user_id, + "room_id": event.room_id, + "content": event.content, + } + + unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} + val["unrecognized_keys"] = unrec + + yield self._simple_insert("events", vals) + + if hasattr(event, "state_key"): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "event_type": event.event_type, + "state_key": event.state_key, + } + + if hasattr(event, "prev_state"): + vals["prev_state"] = event.prev_state + + yield self._simple_insert("state_events", vals) + + # TODO (erikj): We also need to update the current state table? + + @defer.inlineCallbacks + def get_current_state(room_id, event_type=None, state_key="") + sql = ( + "SELECT e.* FROM events as e" + "INNER JOIN current_state as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) + + if event_type: + sql += " s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) else: - raise NotImplementedError( - "Don't know how to persist type=%s" % event.type - ) + args = (room_id, ) + + results = yield self._execute_query(sql, *args) + + defer.returnValue( def schema_path(schema): -- cgit 1.5.1 From 6d6a1c3454ae787e3878202a8e41341ddcf7bee0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 14:30:25 +0100 Subject: Actually encode dicts as json in the DB --- synapse/storage/__init__.py | 4 ++-- synapse/storage/_base.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d38d613450..cd9acdc447 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -81,11 +81,11 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, "event_type", event.type, "sender": event.user_id, "room_id": event.room_id, - "content": event.content, + "content": json.dumps(event.content), } unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} - val["unrecognized_keys"] = unrec + val["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 489b6bd171..5cb26ad6db 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -288,7 +288,8 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row.items() if v}) - d.update(json.loads(row["unrecognized_keys"])) + d.update(json.loads(json.loads(row["unrecognized_keys"]))) + d["content"] = json.loads(d["content"}) del d["unrecognized_keys"] return self.event_factory.create_event( -- cgit 1.5.1 From 2529f2bc01781314ecdedd69e272c737ba1a71f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 16:58:51 +0100 Subject: Rename _execute_query --- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/feedback.py | 2 +- synapse/storage/roommember.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index cd9acdc447..75d93f7111 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -119,7 +119,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, else: args = (room_id, ) - results = yield self._execute_query(sql, *args) + results = yield self._execute_and_decode(sql, *args) defer.returnValue( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index befeb55b25..7fef8601e7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -71,7 +71,7 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(interaction) - def _execut_query(self, query, *args): + def _execute_and_decode(self, query, *args): return self._execute(self.cursor_to_dict, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index b9e792c120..dd5f3fbc10 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -39,7 +39,7 @@ class FeedbackStore(SQLBaseStore): "WHERE feedback.target_event_id = ? " ) - rows = yield self._execute_query(sql, event_id) + rows = yield self._execute_and_decode(sql, event_id) defer.returnValue( [ diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 60296380e6..c99cefbcfc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -141,6 +141,6 @@ class RoomMemberStore(SQLBaseStore): "WHERE %s " ) % (where_clause,) - rows = yield self._execute_query(sql, where_values) + rows = yield self._execute_and_decode(sql, where_values) results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) -- cgit 1.5.1 From 78b501eba68f93c91f53ddf179ebde32f511894f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 17:09:28 +0100 Subject: Fix typo --- synapse/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 75d93f7111..afdd75f46d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -121,7 +121,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, results = yield self._execute_and_decode(sql, *args) - defer.returnValue( + defer.returnValue([self._parse_event_from_row(r) for r in results]) def schema_path(schema): -- cgit 1.5.1 From 661c7117659118ed977f56a092525dbdae9dc67c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 17:34:37 +0100 Subject: Start fixing places that use the data store. --- synapse/handlers/room.py | 17 ++++------------- synapse/rest/room.py | 39 +++++++++++++++++++++------------------ synapse/storage/__init__.py | 8 +++----- synapse/storage/_base.py | 2 +- synapse/storage/feedback.py | 4 +++- synapse/storage/roommember.py | 2 +- 6 files changed, 33 insertions(+), 39 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index eae40765b3..a9ff2d93f1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -141,12 +141,7 @@ class MessageHandler(BaseHandler): yield self.state_handler.handle_new_event(event) # store in db - store_id = yield self.store.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + store_id = yield self.store.persist_event(event) event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id @@ -201,19 +196,15 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_room_data(room_id, event_type, state_key) + data = yield self.store.get_current_state(room_id, event_type, state_key) defer.returnValue(data) @defer.inlineCallbacks - def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, - user_id=None, fb_sender_id=None, fb_type=None): + def get_feedback(self, event_id): yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db - fb = yield self.store.get_feedback( - room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, fb_type=fb_type - ) + fb = yield self.store.get_feedback(event_id) if fb: defer.returnValue(fb) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 1fc0c996b8..3f153df8e3 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -285,25 +285,28 @@ class FeedbackRestServlet(RestServlet): feedback_type): user = yield (self.auth.get_user_by_req(request)) - if feedback_type not in Feedback.LIST: - raise SynapseError(400, "Bad feedback type.", - errcode=Codes.BAD_JSON) - - msg_handler = self.handlers.message_handler - feedback = yield msg_handler.get_feedback( - room_id=urllib.unquote(room_id), - msg_sender_id=msg_sender_id, - msg_id=msg_id, - user_id=user.to_string(), - fb_sender_id=fb_sender_id, - fb_type=feedback_type - ) - - if not feedback: - raise SynapseError(404, "Feedback not found.", - errcode=Codes.NOT_FOUND) + # TODO (erikj): Implement this? + raise NotImplementedError("Getting feedback is not supported") - defer.returnValue((200, json.loads(feedback.content))) +# if feedback_type not in Feedback.LIST: +# raise SynapseError(400, "Bad feedback type.", +# errcode=Codes.BAD_JSON) +# +# msg_handler = self.handlers.message_handler +# feedback = yield msg_handler.get_feedback( +# room_id=urllib.unquote(room_id), +# msg_sender_id=msg_sender_id, +# msg_id=msg_id, +# user_id=user.to_string(), +# fb_sender_id=fb_sender_id, +# fb_type=feedback_type +# ) +# +# if not feedback: +# raise SynapseError(404, "Feedback not found.", +# errcode=Codes.NOT_FOUND) +# +# defer.returnValue((200, json.loads(feedback.content))) @defer.inlineCallbacks def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index afdd75f46d..182b6ebadd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -21,13 +21,11 @@ from synapse.api.events.room import ( from .directory import DirectoryStore from .feedback import FeedbackStore -from .message import MessageStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore -from .roomdata import RoomDataStore from .stream import StreamStore from .pdu import StatePduStore, PduStore from .transactions import TransactionStore @@ -36,7 +34,7 @@ import json import os -class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, +class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, DirectoryStore): @@ -78,7 +76,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, def _store_event(self, event): vals = { "event_id": event.event_id, - "event_type", event.type, + "event_type": event.type, "sender": event.user_id, "room_id": event.room_id, "content": json.dumps(event.content), @@ -105,7 +103,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, # TODO (erikj): We also need to update the current state table? @defer.inlineCallbacks - def get_current_state(room_id, event_type=None, state_key="") + def get_current_state(room_id, event_type=None, state_key=""): sql = ( "SELECT e.* FROM events as e" "INNER JOIN current_state as c ON e.event_id = c.event_id " diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7fef8601e7..533f509709 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -293,7 +293,7 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row.items() if v}) d.update(json.loads(json.loads(row["unrecognized_keys"]))) - d["content"] = json.loads(d["content"}) + d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] return self.event_factory.create_event( diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index dd5f3fbc10..e60f98d1e1 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table from synapse.api.events.room import FeedbackEvent @@ -31,7 +33,7 @@ class FeedbackStore(SQLBaseStore): "sender": event.user_id, }) - @defer.inlineCallback + @defer.inlineCallbacks def get_feedback_for_event(self, event_id): sql = ( "SELECT events.* FROM events INNER JOIN feedback " diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c99cefbcfc..14c0152e8a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -112,7 +112,7 @@ class RoomMemberStore(SQLBaseStore): args = [user_id] args.extend(membership_list) - where_clause "user_id = ? AND (%s)" % ( + where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) -- cgit 1.5.1 From 2c46bb620828efaebdbae37e5212a28b505ee72d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 18:40:50 +0100 Subject: Fix up typos and correct sql queries --- synapse/handlers/room.py | 10 ++-------- synapse/storage/__init__.py | 20 ++++++++++---------- synapse/storage/_base.py | 9 +++++---- synapse/storage/roommember.py | 26 +++++++++++++------------- synapse/storage/schema/im.sql | 4 ++-- synapse/storage/stream.py | 11 ++++------- 6 files changed, 36 insertions(+), 44 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a9ff2d93f1..9b55206e47 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -201,7 +201,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_feedback(self, event_id): - yield self.auth.check_joined_room(room_id, user_id) + # yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db fb = yield self.store.get_feedback(event_id) @@ -690,13 +690,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): # store membership - store_id = yield self.store.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=membership - ) + store_id = yield self.store.persist_event(event) # Send a PDU to all hosts who have joined the room. destinations = yield self.store.get_joined_hosts_for_room( diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 182b6ebadd..f41c21dcd2 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, @@ -52,7 +53,7 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomConfigEvent.TYPE: yield self._store_room_config(event) - self._store_event(event) + yield self._store_event(event) @defer.inlineCallbacks def get_event(self, event_id): @@ -76,14 +77,13 @@ class DataStore(RoomMemberStore, RoomStore, def _store_event(self, event): vals = { "event_id": event.event_id, - "event_type": event.type, - "sender": event.user_id, + "type": event.type, "room_id": event.room_id, "content": json.dumps(event.content), } - unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} - val["unrecognized_keys"] = json.dumps(unrec) + unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + vals["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) @@ -91,7 +91,7 @@ class DataStore(RoomMemberStore, RoomStore, vals = { "event_id": event.event_id, "room_id": event.room_id, - "event_type": event.event_type, + "type": event.type, "state_key": event.state_key, } @@ -103,16 +103,16 @@ class DataStore(RoomMemberStore, RoomStore, # TODO (erikj): We also need to update the current state table? @defer.inlineCallbacks - def get_current_state(room_id, event_type=None, state_key=""): + def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( - "SELECT e.* FROM events as e" - "INNER JOIN current_state as c ON e.event_id = c.event_id " + "SELECT e.* FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " "INNER JOIN state_events as s ON e.event_id = s.event_id " "WHERE c.room_id = ? " ) if event_type: - sql += " s.type = ? AND s.state_key = ? " + sql += " AND s.type = ? AND s.state_key = ? " args = (room_id, event_type, state_key) else: args = (room_id, ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 533f509709..c8ec63f30a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import copy import json @@ -59,7 +60,7 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ + "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None ) def interaction(txn): @@ -72,7 +73,7 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(interaction) def _execute_and_decode(self, query, *args): - return self._execute(self.cursor_to_dict, *args) + return self._execute(self.cursor_to_dict, query, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -291,8 +292,8 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) def _parse_event_from_row(self, row_dict): - d = copy.deepcopy({k: v for k, v in row.items() if v}) - d.update(json.loads(json.loads(row["unrecognized_keys"]))) + d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 14c0152e8a..8c4b04f190 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -54,13 +54,13 @@ class RoomMemberStore(SQLBaseStore): "INSERT OR IGNORE INTO room_hosts (room_id, host) " "VALUES (?, ?)" ) - yield self._execute(None, sql, room_id, domain) + yield self._execute(None, sql, event.room_id, domain) else: sql = ( "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" ) - yield self._execute(None, sql, room_id, domain) + yield self._execute(None, sql, event.room_id, domain) def get_room_member(self, user_id, room_id): @@ -72,10 +72,10 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - return self._get_members_by_dict( - room_id=room_id, - user_id=user_id - ) + return self._get_members_by_dict({ + "e.room_id": room_id, + "m.user_id": user_id, + }) def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -89,11 +89,11 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"room_id": room_id} + where = {"m.room_id": room_id} if membership: - where["membership"] = membership + where["m.membership"] = membership - return self._get_members_by_dict(**membership) + return self._get_members_by_dict(where) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -126,8 +126,8 @@ class RoomMemberStore(SQLBaseStore): ) def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where.keys()) - vals = where.values() + clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) + vals = where_dict.values() return self._get_members_query(clause, vals) @defer.inlineCallbacks @@ -136,11 +136,11 @@ class RoomMemberStore(SQLBaseStore): "SELECT e.* FROM events as e " "INNER JOIN room_memberships as m " "ON e.event_id = m.event_id " - "INNER JOIN current_state as c " + "INNER JOIN current_state_events as c " "ON m.event_id = c.event_id " "WHERE %s " ) % (where_clause,) - rows = yield self._execute_and_decode(sql, where_values) + rows = yield self._execute_and_decode(sql, *where_values) results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 7f564c8540..85c0c7119c 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -30,9 +30,9 @@ CREATE TABLE IF NOT EXISTS state_events( prev_state TEXT ); -CREATE TABLE IF NOT EXISTS current_state( +CREATE TABLE IF NOT EXISTS current_state_events( event_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1456d216f0..9937239c22 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,12 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable from synapse.api.constants import Membership @@ -40,13 +37,13 @@ class StreamStore(SQLBaseStore): current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state as c ON m.event_id = c.event_id " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ?" ) invites_sql = ( "SELECT m.event_id FROM room_membershipas as m " - "INNER JOIN current_state as c ON m.event_id = c.event_id " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -71,4 +68,4 @@ class StreamStore(SQLBaseStore): user_id, user_id, Membership.INVITE ) - defer.returnValue([self._parse_event_from_row(r) for r in results]) + defer.returnValue([self._parse_event_from_row(r) for r in rows]) -- cgit 1.5.1 From 5002efa31bb57a92b87b9d7319641d9b5a2a6047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 10:26:35 +0100 Subject: Reimplement the get public rooms api to work with new DB schema --- synapse/api/events/factory.py | 3 +- synapse/api/events/room.py | 23 +++++++++++ synapse/handlers/room.py | 2 +- synapse/storage/__init__.py | 6 ++- synapse/storage/_base.py | 2 +- synapse/storage/room.py | 90 +++++++++++++++++++++++++++---------------- synapse/storage/schema/im.sql | 12 ++++++ 7 files changed, 101 insertions(+), 37 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 12aa04fc6e..23d2b0401c 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -15,7 +15,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, - InviteJoinEvent, RoomConfigEvent + InviteJoinEvent, RoomConfigEvent, RoomNameEvent, ) from synapse.util.stringutils import random_string @@ -25,6 +25,7 @@ class EventFactory(object): _event_classes = [ RoomTopicEvent, + RoomNameEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index f3df849af2..8136d495d5 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -19,14 +19,37 @@ from . import SynapseEvent class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" + internal_keys = SynapseEvent.internal_keys + [ + "topic", + ] + def __init__(self, **kwargs): kwargs["state_key"] = "" + if "topic" in kwargs["content"]: + kwargs["topic"] = kwargs["content"]["topic"] super(RoomTopicEvent, self).__init__(**kwargs) def get_content_template(self): return {"topic": u"string"} +class RoomNameEvent(SynapseEvent): + TYPE = "m.room.name" + + internal_keys = SynapseEvent.internal_keys + [ + "name", + ] + + def __init__(self, **kwargs): + kwargs["state_key"] = "" + if "name" in kwargs["content"]: + kwargs["name"] = kwargs["content"]["name"] + super(RoomNameEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"name": u"string"} + + class RoomMemberEvent(SynapseEvent): TYPE = "m.room.member" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9b55206e47..5c1b59dbc9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -790,5 +790,5 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): - chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + chunk = yield self.store.get_rooms(is_public=True) defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f41c21dcd2..f62cee3c39 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, - RoomConfigEvent + RoomConfigEvent, RoomNameEvent, ) from .directory import DirectoryStore @@ -52,6 +52,10 @@ class DataStore(RoomMemberStore, RoomStore, yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: yield self._store_room_config(event) + elif event.type == RoomNameEvent.TYPE: + yield self._store_room_name(event) + elif event.type == RoomTopicEvent.TYPE: + yield self._store_room_topic(event) yield self._store_event(event) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c8ec63f30a..c26e9a0f98 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -68,7 +68,7 @@ class SQLBaseStore(object): if decoder: return decoder(cursor) else: - return cursor + return cursor.fetchall() return self._db_pool.runInteraction(interaction) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a97162831b..8f44b67d8c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -76,49 +76,73 @@ class RoomStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_rooms(self, is_public, with_topics): + def get_rooms(self, is_public): """Retrieve a list of all public rooms. Args: is_public (bool): True if the rooms returned should be public. - with_topics (bool): True to include the current topic for the room - in the response. Returns: - A list of room dicts containing at least a "room_id" key, and a - "topic" key if one is set and with_topic=True. + A list of room dicts containing at least a "room_id" key, a + "topic" key if one is set, and a "name" key if one is set """ - room_data_type = RoomTopicEvent.TYPE - public = 1 if is_public else 0 - - latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE " - + "room_data.type = ? GROUP BY room_id") - - query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms " - + "LEFT JOIN " - + "room_aliases ON room_aliases.room_id = rooms.room_id " - + "LEFT JOIN " - + "room_data ON rooms.room_id = room_data.room_id WHERE " - + "(room_data.id IN (" + latest_topic + ") " - + "OR room_data.id IS NULL) AND rooms.is_public = ?") - - res = yield self._execute( - self.cursor_to_dict, query, room_data_type, public + + topic_subquery = ( + "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "INNER JOIN current_state_events as c " + "ON c.event_id = topics.event_id " ) - # return only the keys the specification expects - ret_keys = ["room_id", "topic", "room_alias"] + name_subquery = ( + "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "INNER JOIN current_state_events as c " + "ON c.event_id = room_names.event_id " + ) - # extract topic from the json (icky) FIXME - for i, room_row in enumerate(res): - try: - content_json = json.loads(room_row["content"]) - room_row["topic"] = content_json["topic"] - except: - pass # no topic set - # filter the dict based on ret_keys - res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys} + sql = ( + "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " + "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " + "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " + "WHERE r.is_public = ? " + "GROUP BY r.room_id " + ) % { + "topic": topic_subquery, + "name": name_subquery, + } + + rows = yield self._execute(None, sql, is_public) + + ret = [ + { + "room_id": r[0], + "name": r[1], + "topic": r[2], + "aliases": r[3].split(","), + } + for r in rows + ] + + defer.returnValue(ret) + + def _store_room_topic(self, event): + return self._simple_insert( + "topics", + { + "event_id": event.event_id, + "room_id": event.room_id, + "topic": event.topic, + } + ) - defer.returnValue(res) + def _store_room_name(self, event): + return self._simple_insert( + "room_names", + { + "event_id": event.event_id, + "room_id": event.room_id, + "name": event.name, + } + ) class RoomsTable(Table): diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 85c0c7119c..9a0f2145d5 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -51,6 +51,18 @@ CREATE TABLE IF NOT EXISTS feedback( room_id TEXT ); +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + topic TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL +); + CREATE TABLE IF NOT EXISTS rooms( room_id TEXT PRIMARY KEY NOT NULL, is_public INTEGER, -- cgit 1.5.1 From 114984a2361ee41005a769f6dc127c470ee08aee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 13:58:28 +0100 Subject: Start chagning the events stream to work with the new DB schema --- synapse/api/streams/event.py | 77 +++-------------------------------------- synapse/handlers/events.py | 8 ++--- synapse/handlers/room.py | 79 +++++++++++++++++++++++-------------------- synapse/storage/__init__.py | 10 +++++- synapse/storage/_base.py | 2 +- synapse/storage/roommember.py | 8 ++++- synapse/storage/schema/im.sql | 5 ++- synapse/storage/stream.py | 31 +++++++++++++++++ 8 files changed, 102 insertions(+), 118 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 4b6d739e54..427363cad4 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -28,17 +28,17 @@ import logging logger = logging.getLogger(__name__) -class MessagesStreamData(StreamData): - EVENT_TYPE = MessageEvent.TYPE +class EventsStreamData(StreamData): + EVENT_TYPE = "EventsStream" def __init__(self, hs, room_id=None, feedback=False): - super(MessagesStreamData, self).__init__(hs) + super(EventsStreamData, self).__init__(hs) self.room_id = room_id self.with_feedback = feedback @defer.inlineCallbacks def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_message_stream( + data, latest_ver = yield self.store.get_room_events_stream( user_id=user_id, from_key=from_key, to_key=to_key, @@ -50,74 +50,7 @@ class MessagesStreamData(StreamData): @defer.inlineCallbacks def max_token(self): - val = yield self.store.get_max_message_id() - defer.returnValue(val) - - -class RoomMemberStreamData(StreamData): - EVENT_TYPE = RoomMemberEvent.TYPE - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_member_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key - ) - - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_member_id() - defer.returnValue(val) - - -class FeedbackStreamData(StreamData): - EVENT_TYPE = FeedbackEvent.TYPE - - def __init__(self, hs, room_id=None): - super(FeedbackStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_feedback_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_feedback_id() - defer.returnValue(val) - - -class RoomDataStreamData(StreamData): - EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types - - def __init__(self, hs, room_id=None): - super(RoomDataStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_data_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_data_id() + val = yield self.store.get_room_events_max_id() defer.returnValue(val) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3af7d824a2..6bb797caf2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,8 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.streams.event import ( - EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, - RoomDataStreamData + EventStream, EventsStreamData ) from synapse.handlers.presence import PresenceStreamData @@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData class EventStreamHandler(BaseHandler): stream_data_classes = [ - MessagesStreamData, - RoomMemberStreamData, - FeedbackStreamData, - RoomDataStreamData, + EventsStreamData, PresenceStreamData, ] diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 432d13982a..3451250008 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.api.streams.event import EventStream, EventsStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -97,30 +97,30 @@ class MessageHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) yield self.hs.get_federation().handle_new_event(event) - - @defer.inlineCallbacks - def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False): - """Get messages in a room. - - Args: - user_id (str): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages - Returns: - dict: Pagination API results - """ - yield self.auth.check_joined_room(room_id, user_id) - - data_source = [MessagesStreamData(self.hs, room_id=room_id, - feedback=feedback)] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) - +# +# @defer.inlineCallbacks +# def get_messages(self, user_id=None, room_id=None, pagin_config=None, +# feedback=False): +# """Get messages in a room. +# +# Args: +# user_id (str): The user requesting messages. +# room_id (str): The room they want messages from. +# pagin_config (synapse.api.streams.PaginationConfig): The pagination +# config rules to apply, if any. +# feedback (bool): True to get compressed feedback with the messages +# Returns: +# dict: Pagination API results +# """ +# yield self.auth.check_joined_room(room_id, user_id) +# +# data_source = [MessagesStreamData(self.hs, room_id=room_id, +# feedback=feedback)] +# event_stream = EventStream(user_id, data_source) +# pagin_config = yield event_stream.fix_tokens(pagin_config) +# data_chunk = yield event_stream.get_chunk(config=pagin_config) +# defer.returnValue(data_chunk) +# @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): """ Stores data for a room. @@ -251,20 +251,27 @@ class MessageHandler(BaseHandler): user_id=user_id, membership_list=[Membership.INVITE, Membership.JOIN] ) - for room_info in room_list: - if room_info["membership"] != Membership.JOIN: + + ret = [] + + for event in room_list: + d = event.get_dict() + ret.append(d) + + if event.membership != Membership.JOIN: continue try: - event_chunk = yield self.get_messages( - user_id=user_id, - pagin_config=pagin_config, - feedback=feedback, - room_id=room_info["room_id"] + messages = yield self.store.get_recent_events_for_room( + event.room_id, + limit=50, ) - room_info["messages"] = event_chunk + d["messages"] = [m.get_dict() for m in messages] except: pass - defer.returnValue(room_list) + + logger.debug("snapshot_all_rooms returning: %s", ret) + + defer.returnValue(ret) class RoomCreationHandler(BaseHandler): @@ -442,7 +449,7 @@ class RoomMemberHandler(BaseHandler): member_list = yield self.store.get_room_members(room_id=room_id) event_list = [ - entry.as_event(self.event_factory).get_dict() + entry.get_dict() for entry in member_list ] chunk_data = { @@ -685,7 +692,7 @@ class RoomMemberHandler(BaseHandler): user_id=user.to_string(), membership_list=membership_list ) - defer.returnValue([r["room_id"] for r in rooms]) + defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f62cee3c39..46b9dbcbbf 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -104,7 +104,15 @@ class DataStore(RoomMemberStore, RoomStore, yield self._simple_insert("state_events", vals) - # TODO (erikj): We also need to update the current state table? + yield self._simple_insert( + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c26e9a0f98..413838f798 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -293,7 +293,7 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) - d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) + d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c4b04f190..a0620677b9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -63,6 +63,7 @@ class RoomMemberStore(SQLBaseStore): yield self._execute(None, sql, event.room_id, domain) + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -72,11 +73,13 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - return self._get_members_by_dict({ + rows = yield self._get_members_by_dict({ "e.room_id": room_id, "m.user_id": user_id, }) + defer.returnValue(rows[0] if rows else None) + def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -142,5 +145,8 @@ class RoomMemberStore(SQLBaseStore): ) % (where_clause,) rows = yield self._execute_and_decode(sql, *where_values) + + logger.debug("_get_members_query Got rows %s", rows) + results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 9a0f2145d5..2452890ea4 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -32,7 +32,10 @@ CREATE TABLE IF NOT EXISTS state_events( CREATE TABLE IF NOT EXISTS current_state_events( event_id TEXT NOT NULL, - room_id TEXT NOT NULL + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 9937239c22..c5c3770a40 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -34,6 +34,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): + # TODO (erikj): Handle compressed feedback current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " @@ -69,3 +70,33 @@ class StreamStore(SQLBaseStore): ) defer.returnValue([self._parse_event_from_row(r) for r in rows]) + + @defer.inlineCallbacks + def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + # TODO (erikj): Handle compressed feedback + + sql = ( + "SELECT * FROM events WHERE room_id = ? " + "ORDER BY ordering DESC LIMIT ? " + ) + + rows = yield self._execute_and_decode( + sql, + room_id, limit + ) + + rows.reverse() # As we selected with reverse ordering + + defer.returnValue([self._parse_event_from_row(r) for r in rows]) + + @defer.inlineCallbacks + def get_room_events_max_id(self): + res = yield self._execute_and_decode( + "SELECT MAX(ordering) as m FROM events" + ) + + if not res: + defer.returnValue(0) + return + + defer.returnValue(res[0]["m"]) -- cgit 1.5.1 From 01f089d9fbb9b89fa143ac44e51529fa8ed7ec12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 15:28:54 +0100 Subject: Correctly return new token when returning events. Serialize events correctly. --- synapse/api/notifier.py | 3 ++- synapse/api/streams/event.py | 2 +- synapse/handlers/room.py | 5 ++++- synapse/storage/__init__.py | 6 +++++- synapse/storage/stream.py | 18 +++++++++++++----- 5 files changed, 25 insertions(+), 9 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py index 65b5a4ebb3..9f622df6bb 100644 --- a/synapse/api/notifier.py +++ b/synapse/api/notifier.py @@ -15,6 +15,7 @@ from synapse.api.constants import Membership from synapse.api.events.room import RoomMemberEvent +from synapse.api.streams.event import EventsStreamData from twisted.internet import defer from twisted.internet import reactor @@ -66,7 +67,7 @@ class Notifier(object): self._notify_and_callback( user_id=user_id, event_data=event.get_dict(), - stream_type=event.type, + stream_type=EventsStreamData.EVENT_TYPE, store_id=store_id) def on_new_user_event(self, user_id, event_data, stream_type, store_id): diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 427363cad4..895a96b5b9 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -160,7 +160,7 @@ class EventStream(PaginationStream): self.user_id, from_pkey, to_pkey, limit ) - chunk += event_chunk + chunk += [e.get_dict() for e in event_chunk] next_ver.append(str(max_pkey)) defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3451250008..9261984b7e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -255,7 +255,10 @@ class MessageHandler(BaseHandler): ret = [] for event in room_list: - d = event.get_dict() + d = { + "room_id": event.room_id, + "membership": event.membership, + } ret.append(d) if event.membership != Membership.JOIN: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 46b9dbcbbf..750e86040e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,8 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - yield self._store_event(event) + ret = yield self._store_event(event) + defer.returnValue(ret) @defer.inlineCallbacks def get_event(self, event_id): @@ -114,6 +115,9 @@ class DataStore(RoomMemberStore, RoomStore, } ) + latest = yield self.get_room_events_max_id() + defer.returnValue(latest) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c5c3770a40..1300aee8b0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ class StreamStore(SQLBaseStore): ) invites_sql = ( - "SELECT m.event_id FROM room_membershipas as m " + "SELECT m.event_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -55,8 +55,9 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events as e WHERE " - "(room_id IN (%(current)s)) OR " - "(event_id IN (%(invites)s)) " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + " AND e.ordering > ? AND e.ordering < ? " "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, @@ -66,10 +67,17 @@ class StreamStore(SQLBaseStore): rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE + user_id, user_id, Membership.INVITE, from_key, to_key ) - defer.returnValue([self._parse_event_from_row(r) for r in rows]) + ret = [self._parse_event_from_row(r) for r in rows] + + if ret: + max_id = max([r["ordering"] for r in rows]) + else: + max_id = to_key + + defer.returnValue((ret, max_id)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): -- cgit 1.5.1 From d260a42ca279fbca46f85b2c96bddc4f814ecef3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:17:36 +0100 Subject: PEP8 cleanups --- synapse/api/events/room.py | 1 + synapse/handlers/room.py | 17 ++++++++++------- synapse/storage/__init__.py | 6 +++++- synapse/storage/_base.py | 3 ++- synapse/storage/room.py | 15 +++++++++++---- synapse/storage/roommember.py | 1 - synapse/storage/stream.py | 2 -- 7 files changed, 29 insertions(+), 16 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index d9b3d572fa..dbf537fb88 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -20,6 +20,7 @@ class GenericEvent(SynapseEvent): def get_content_template(self): return {} + class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b0b2441b9f..4ecfb278be 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -114,8 +114,9 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [EventsStreamData(self.hs, room_id=room_id, - feedback=feedback)] + data_source = [ + EventsStreamData(self.hs, room_id=room_id, feedback=feedback) + ] event_stream = EventStream(user_id, data_source) pagin_config = yield event_stream.fix_tokens(pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config) @@ -196,7 +197,9 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state(room_id, event_type, state_key) + data = yield self.store.get_current_state( + room_id, event_type, state_key + ) defer.returnValue(data) @defer.inlineCallbacks @@ -496,7 +499,7 @@ class RoomMemberHandler(BaseHandler): SynapseError if there was a problem changing the membership. """ - #broadcast_msg = False + # broadcast_msg = False prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id @@ -570,7 +573,8 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True, + broadcast_msg=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -621,7 +625,6 @@ class RoomMemberHandler(BaseHandler): broadcast_msg=broadcast_msg, ) - if should_do_dance: yield self._do_invite_join_dance( room_id=room_id, @@ -755,7 +758,7 @@ class RoomMemberHandler(BaseHandler): room_id, "", is_public=False ) - #yield self.state_handler.handle_new_event(event) + # yield self.state_handler.handle_new_event(event) yield federation.handle_new_event(new_event) yield federation.get_state_for_room( target_host, room_id diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 750e86040e..ad36d14a36 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -87,7 +87,11 @@ class DataStore(RoomMemberStore, RoomStore, "content": json.dumps(event.content), } - unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + unrec = { + k: v + for k, v in event.get_full_dict().items() + if k not in vals.keys() + } vals["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 413838f798..36cc57c1b8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,7 +60,8 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None + "[SQL] %s Args=%s Func=%s", + query, args, decoder.__name__ if decoder else None ) def interaction(txn): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8f44b67d8c..22f2dcca45 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -87,19 +87,26 @@ class RoomStore(SQLBaseStore): """ topic_subquery = ( - "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "SELECT topics.event_id as event_id, " + "topics.room_id as room_id, topic " + "FROM topics " "INNER JOIN current_state_events as c " "ON c.event_id = topics.event_id " ) name_subquery = ( - "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "SELECT room_names.event_id as event_id, " + "room_names.room_id as room_id, name " + "FROM room_names " "INNER JOIN current_state_events as c " "ON c.event_id = room_names.event_id " ) + # We use non printing ascii character US () as a seperator sql = ( - "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "SELECT r.room_id, n.name, t.topic, " + "group_concat(a.room_alias, '') " + "FROM rooms AS r " "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " @@ -117,7 +124,7 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(","), + "aliases": r[3].split(""), } for r in rows ] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index a0620677b9..89c87290cf 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -62,7 +62,6 @@ class RoomMemberStore(SQLBaseStore): yield self._execute(None, sql, event.room_id, domain) - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7c2c45e0ff..cf4b1682b6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -61,7 +61,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], to_key)) return - sql = ( "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " @@ -90,7 +89,6 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if rows: if from_key < to_key: key = max([r["ordering"] for r in rows]) -- cgit 1.5.1 From 506711749f674c2b408f4034662240883e0e2764 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:45:16 +0100 Subject: We no longer need to special case room config events. --- synapse/storage/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ad36d14a36..841ad8f132 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -50,8 +50,8 @@ class DataStore(RoomMemberStore, RoomStore, yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: yield self._store_feedback(event) - elif event.type == RoomConfigEvent.TYPE: - yield self._store_room_config(event) +# elif event.type == RoomConfigEvent.TYPE: +# yield self._store_room_config(event) elif event.type == RoomNameEvent.TYPE: yield self._store_room_name(event) elif event.type == RoomTopicEvent.TYPE: -- cgit 1.5.1 From fc26275bb34206f48d70c7effcbc6f5d0bf86ecb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 15:50:41 +0100 Subject: Add two different columns for ordering the events table, one which can be used for pagination and one which can be as tokens for notifying clients. Also add a 'processed' field which is currently always set to True --- synapse/federation/handler.py | 4 ++-- synapse/storage/__init__.py | 18 ++++++++++++++---- synapse/storage/schema/im.sql | 13 ++++++++----- synapse/storage/stream.py | 17 +++++++++-------- 4 files changed, 33 insertions(+), 19 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 580e591aca..68243d31d0 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -63,7 +63,7 @@ class FederationEventHandler(object): Deferred: Resolved when it has successfully been queued for processing. """ - yield self._fill_out_prev_events(event) + yield self.fill_out_prev_events(event) pdu = self.pdu_codec.pdu_from_event(event) @@ -129,7 +129,7 @@ class FederationEventHandler(object): yield self.event_handler.on_receive(new_state_event) @defer.inlineCallbacks - def _fill_out_prev_events(self, event): + def fill_out_prev_events(self, event): if hasattr(event, "prev_events"): return diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 841ad8f132..9f78f3f9bd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -43,9 +43,10 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() + self.hs = hs @defer.inlineCallbacks - def persist_event(self, event): + def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: @@ -57,7 +58,7 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - ret = yield self._store_event(event) + ret = yield self._store_event(event, backfilled) defer.returnValue(ret) @defer.inlineCallbacks @@ -79,14 +80,23 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks - def _store_event(self, event): + def _store_event(self, event, backfilled): + # FIXME (erikj): This should be removed when we start amalgamating + # event and pdu storage. + yield self.hs.get_federation().fill_out_prev_events(event) + vals = { + "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, "content": json.dumps(event.content), + "processed": True, } + if backfilled: + vals["token_ordering"] = "-1" + unrec = { k: v for k, v in event.get_full_dict().items() @@ -96,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore, yield self._simple_insert("events", vals) - if hasattr(event, "state_key"): + if not backfilled and hasattr(event, "state_key"): vals = { "event_id": event.event_id, "room_id": event.room_id, diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 2452890ea4..b0240e39af 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,12 +14,15 @@ */ CREATE TABLE IF NOT EXISTS events( - ordering INTEGER PRIMARY KEY AUTOINCREMENT, + token_ordering INTEGER AUTOINCREMENT, + topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, - room_id TEXT, - content TEXT, - unrecognized_keys TEXT + room_id TEXT NOT NULL, + content TEXT NOT NULL, + unrecognized_keys TEXT, + processed BOOL NOT NULL, + CONSTRAINT ev_uniq UNIQUE (event_id) ); CREATE TABLE IF NOT EXISTS state_events( @@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS current_state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE + CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cf4b1682b6..f7968f576f 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -73,13 +73,14 @@ class StreamStore(SQLBaseStore): # Constraints and ordering depend on direction. if from_key < to_key: sql += ( - "AND e.ordering > ? AND e.ordering < ? " - "ORDER BY ordering ASC LIMIT %(limit)d " + "AND e.token_ordering > ? AND e.token_ordering < ? " + "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d " ) % {"limit": limit} else: sql += ( - "AND e.ordering < ? AND e.ordering > ? " - "ORDER BY ordering DESC LIMIT %(limit)d " + "AND e.token_ordering < ? " + "AND e.token_ordering > ? " + "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d " ) % {"limit": int(limit)} rows = yield self._execute_and_decode( @@ -91,9 +92,9 @@ class StreamStore(SQLBaseStore): if rows: if from_key < to_key: - key = max([r["ordering"] for r in rows]) + key = max([r["token_ordering"] for r in rows]) else: - key = min([r["ordering"] for r in rows]) + key = min([r["token_ordering"] for r in rows]) else: key = to_key @@ -105,7 +106,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events WHERE room_id = ? " - "ORDER BY ordering DESC LIMIT ? " + "ORDER BY token_ordering, rowid DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -120,7 +121,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_max_id(self): res = yield self._execute_and_decode( - "SELECT MAX(ordering) as m FROM events" + "SELECT MAX(token_ordering) as m FROM events" ) if not res: -- cgit 1.5.1 From 709a92cee89709811c51cac7d8c66922093be673 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 16:00:46 +0100 Subject: SQL doesn't allow AUTOINCREMENT on non PRIMARY KEY columns. --- synapse/storage/__init__.py | 21 +++++++++++++++++++-- synapse/storage/schema/im.sql | 2 +- tests/rest/test_presence.py | 2 ++ tests/rest/test_profile.py | 1 + 4 files changed, 23 insertions(+), 3 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 9f78f3f9bd..b846081d49 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,6 +45,9 @@ class DataStore(RoomMemberStore, RoomStore, self.event_factory = hs.get_event_factory() self.hs = hs + self.min_token_deferred = self._get_min_token() + self.min_token = None + @defer.inlineCallbacks def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: @@ -82,7 +85,7 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks def _store_event(self, event, backfilled): # FIXME (erikj): This should be removed when we start amalgamating - # event and pdu storage. + # event and pdu storage yield self.hs.get_federation().fill_out_prev_events(event) vals = { @@ -95,7 +98,10 @@ class DataStore(RoomMemberStore, RoomStore, } if backfilled: - vals["token_ordering"] = "-1" + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + vals["token_ordering"] = self.min_token unrec = { k: v @@ -151,6 +157,17 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue([self._parse_event_from_row(r) for r in results]) + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + None, + "SELECT MIN(token_ordering) FROM events" + ) + + self.min_token = rows[0][0] if rows and rows[0] else 0 + + defer.returnValue(self.min_token) + def schema_path(schema): """ Get a filesystem path for the named database schema diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index b0240e39af..0fb3dbee55 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS events( - token_ordering INTEGER AUTOINCREMENT, + token_ordering INTEGER PRIMARY KEY AUTOINCREMENT, topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, diff --git a/tests/rest/test_presence.py b/tests/rest/test_presence.py index 91d4d1ff6c..99287823e4 100644 --- a/tests/rest/test_presence.py +++ b/tests/rest/test_presence.py @@ -51,6 +51,7 @@ class PresenceStateTestCase(unittest.TestCase): hs = HomeServer("test", db_pool=None, http_client=None, + datastore=None, resource_for_client=self.mock_server, resource_for_federation=self.mock_server, ) @@ -109,6 +110,7 @@ class PresenceListTestCase(unittest.TestCase): hs = HomeServer("test", db_pool=None, http_client=None, + datastore=None, resource_for_client=self.mock_server, resource_for_federation=self.mock_server ) diff --git a/tests/rest/test_profile.py b/tests/rest/test_profile.py index ff1e92805e..ee47daf4c2 100644 --- a/tests/rest/test_profile.py +++ b/tests/rest/test_profile.py @@ -46,6 +46,7 @@ class ProfileTestCase(unittest.TestCase): resource_for_client=self.mock_server, federation=Mock(), replication_layer=Mock(), + datastore=None, ) def _get_user_by_token(token=None): -- cgit 1.5.1 From 75b6d982a01a431a89d2ab76d91a09159630d059 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:20:03 +0100 Subject: Add a 'backfill room' button --- synapse/federation/handler.py | 22 +++++++++++++++------- synapse/federation/replication.py | 9 ++++++--- synapse/handlers/federation.py | 21 ++++++++++++++++++--- synapse/rest/room.py | 16 ++++++++++++++++ synapse/storage/__init__.py | 22 ++++++++++++++++++---- synapse/storage/pdu.py | 16 ++++++++-------- 6 files changed, 81 insertions(+), 25 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 68243d31d0..984c1558e9 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -74,10 +74,18 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def backfill(self, room_id, limit): - # TODO: Work out which destinations to ask for backfill - # self.replication_layer.backfill(dest, room_id, limit) - pass + def backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) @log_function def get_state_for_room(self, destination, room_id): @@ -87,7 +95,7 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu): + def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it throught the StateHandler. """ @@ -95,7 +103,7 @@ class FederationEventHandler(object): try: with (yield self.lock_manager.lock(pdu.context)): - if event.is_state: + if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( pdu ) @@ -104,7 +112,7 @@ class FederationEventHandler(object): else: is_new_state = False - yield self.event_handler.on_receive(event, is_new_state) + yield self.event_handler.on_receive(event, is_new_state, backfilled) except AuthError: # TODO: Implement something in federation that allows us to diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bc9df2f214..3e5f1a4108 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -208,7 +208,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(pdu, backfilled=True) defer.returnValue(pdus) @@ -415,7 +415,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu): + def _handle_new_pdu(self, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -451,7 +451,10 @@ class ReplicationLayer(object): # Persist the Pdu, but don't mark it as processed yet. yield self.pdu_actions.persist_received(pdu) - ret = yield self.handler.on_receive_pdu(pdu) + if not backfilled: + ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + else: + ret = None yield self.pdu_actions.mark_as_processed(pdu) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a2..ef9ed274df 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,7 +35,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +70,21 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) - yield self.notifier.on_new_room_event(event, store_id) + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.debug("Failed to persiste event: %s", event) + + defer.returnValue(events) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index dfb2aabe70..89ea9f0d25 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet): defer.returnValue((200, msgs)) +class RoomTriggerBackfill(RestServlet): + PATTERN = client_path_pattern("/rooms/(?P[^/]*)/backfill$") + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + remote_server = urllib.unquote(request.args["remote"][0]) + room_id = urllib.unquote(room_id) + limit = int(request.args["limit"][0]) + + handler = self.handlers.federation_handler + events = yield handler.backfill(remote_server, room_id, limit) + + res = [event.get_dict() for event in events] + defer.returnValue((200, res)) + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -403,3 +418,4 @@ def register_servlets(hs, http_server): RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) + RoomTriggerBackfill(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b846081d49..2243a710db 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,6 +20,8 @@ from synapse.api.events.room import ( RoomConfigEvent, RoomNameEvent, ) +from synapse.util.logutils import log_function + from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore from .transactions import TransactionStore import json +import logging import os +logger = logging.getLogger(__name__) + + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, @@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token = None @defer.inlineCallbacks + @log_function def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) @@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks + @log_function def _store_event(self, event, backfilled): # FIXME (erikj): This should be removed when we start amalgamating # event and pdu storage @@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore, if not self.min_token_deferred.called: yield self.min_token_deferred self.min_token -= 1 - vals["token_ordering"] = self.min_token + vals["stream_ordering"] = self.min_token unrec = { k: v @@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore, } vals["unrecognized_keys"] = json.dumps(unrec) - yield self._simple_insert("events", vals) + try: + yield self._simple_insert("events", vals) + except: + logger.exception("Failed to persist, probably duplicate") + return if not backfilled and hasattr(event, "state_key"): vals = { @@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore, def _get_min_token(self): row = yield self._execute( None, - "SELECT MIN(token_ordering) FROM events" + "SELECT MIN(stream_ordering) FROM events" ) - self.min_token = rows[0][0] if rows and rows[0] else 0 + self.min_token = min(row[0][0], -1) if row and row[0] else -1 + + logger.debug("min_token is: %s", self.min_token) defer.returnValue(self.min_token) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a24ce7ab78..7655f43ede 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table, JoinHelper from synapse.util.logutils import log_function @@ -319,6 +321,7 @@ class PduStore(SQLBaseStore): return [(row[0], row[1], row[2]) for row in results] + @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): """Get a list of Pdus that we haven't backfilled beyond yet (and haven't seen). This list is used when we want to backfill backwards and is the @@ -331,17 +334,14 @@ class PduStore(SQLBaseStore): Returns: list: A list of PduIdTuple. """ - return self._db_pool.runInteraction( - self._get_oldest_pdus_in_context, context - ) - - def _get_oldest_pdus_in_context(self, txn, context): - txn.execute( + results = yield self._execute( + None, "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" % {"back": PduBackwardExtremitiesTable.table_name, }, - (context,) + context ) - return [PduIdTuple(i, o) for i, o in txn.fetchall()] + + defer.returnValue([PduIdTuple(i, o) for i, o in results]) def is_pdu_new(self, pdu_id, origin, context, depth): """For a given Pdu, try and figure out if it's 'new', i.e., if it's -- cgit 1.5.1 From 840771190fc11e51ee0810749a99d87186ccb78b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:32:47 +0100 Subject: Fix bug where we sometimes set min_token to None. --- synapse/storage/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2243a710db..470b7b7663 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -176,7 +176,8 @@ class DataStore(RoomMemberStore, RoomStore, "SELECT MIN(stream_ordering) FROM events" ) - self.min_token = min(row[0][0], -1) if row and row[0] else -1 + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) logger.debug("min_token is: %s", self.min_token) -- cgit 1.5.1 From e8244c23baf150c59e737761c15ce540a3e9e26f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Aug 2014 15:53:07 +0100 Subject: Give the event_id of the failed event --- synapse/storage/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 470b7b7663..7732906927 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -121,7 +121,10 @@ class DataStore(RoomMemberStore, RoomStore, try: yield self._simple_insert("events", vals) except: - logger.exception("Failed to persist, probably duplicate") + logger.exception( + "Failed to persist, probably duplicate: %s", + event_id + ) return if not backfilled and hasattr(event, "state_key"): -- cgit 1.5.1 From 808f663ed179dcb16a315810f7f0f8a7eec77e01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Aug 2014 13:06:07 +0100 Subject: Don't return state event outlier's when paginating. --- synapse/storage/__init__.py | 7 ++++++- synapse/storage/schema/im.sql | 1 + synapse/storage/stream.py | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage/__init__.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7732906927..d06033b980 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -105,6 +105,11 @@ class DataStore(RoomMemberStore, RoomStore, "processed": True, } + if hasattr(event, "outlier"): + vals["outlier"] = event.outlier + else: + vals["outlier"] = False + if backfilled: if not self.min_token_deferred.called: yield self.min_token_deferred @@ -123,7 +128,7 @@ class DataStore(RoomMemberStore, RoomStore, except: logger.exception( "Failed to persist, probably duplicate: %s", - event_id + event.event_id ) return diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index ea04261ff0..39a1ed703e 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS events( content TEXT NOT NULL, unrecognized_keys TEXT, processed BOOL NOT NULL, + outlier BOOL NOT NULL, CONSTRAINT ev_uniq UNIQUE (event_id) ); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8bc502483a..87ae961ccd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -177,6 +177,7 @@ class StreamStore(SQLBaseStore): "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "current": current_room_membership_sql, @@ -224,7 +225,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events " - "WHERE room_id = ? AND %(bounds)s " + "WHERE outlier = 0 AND room_id = ? AND %(bounds)s " "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " ) % {"bounds": bounds, "order": order, "limit": limit_str} -- cgit 1.5.1