From 3dfa84bec81e20cfde887fd87114607f58505edf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Aug 2014 12:52:39 +0100 Subject: Convert im schema to a 'one' table structure --- synapse/storage/schema/im.sql | 57 +++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 26 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 77096546b2..e09ff6b64b 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -12,43 +12,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -CREATE TABLE IF NOT EXISTS rooms( - room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, - creator TEXT + +CREATE TABLE IF NOT EXISTS events( + ordering INTEGER PRIMARY KEY AUTOINCREMENT, + event_id TEXT NOT NULL, + event_type TEXT NOT NULL, + sender TEXT, + room_id TEXT, + content TEXT, + unrecognized_keys TEXT ); -CREATE TABLE IF NOT EXISTS room_memberships( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server - sender TEXT NOT NULL, +CREATE TABLE IF NOT EXISTS state_events( + event_id TEXT NOT NULL, room_id TEXT NOT NULL, - membership TEXT NOT NULL, - content TEXT NOT NULL + event_type TEXT NOT NULL, + state_key TEXT NOT NULL, + prev_state TEXT ); -CREATE TABLE IF NOT EXISTS messages( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id TEXT, - room_id TEXT, - msg_id TEXT, - content TEXT +CREATE TABLE IF NOT EXISTS current_state( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, +); + +CREATE TABLE IF NOT EXISTS room_memberships( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + sender TEXT NOT NULL, + room_id TEXT NOT NULL, + membership TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS feedback( - id INTEGER PRIMARY KEY AUTOINCREMENT, - content TEXT, + event_id TEXT NOT NULL, feedback_type TEXT, fb_sender_id TEXT, - msg_id TEXT, room_id TEXT, - msg_sender_id TEXT + content TEXT ); -CREATE TABLE IF NOT EXISTS room_data( - id INTEGER PRIMARY KEY AUTOINCREMENT, - room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - content TEXT +CREATE TABLE IF NOT EXISTS rooms( + room_id TEXT PRIMARY KEY NOT NULL, + is_public INTEGER, + creator TEXT ); -- cgit 1.4.1 From 336987bb8debec9dcdbe27d59ce3889b39f86dbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Aug 2014 16:27:14 +0100 Subject: Initial stab at refactoring the SQL tables, including rejigging some of the storage layer. --- synapse/storage/__init__.py | 113 +++++++++++++++++++++++++++--------------- synapse/storage/_base.py | 17 ++++++- synapse/storage/feedback.py | 69 ++++++++------------------ synapse/storage/message.py | 81 ------------------------------ synapse/storage/roomdata.py | 85 ------------------------------- synapse/storage/schema/im.sql | 11 ++-- 6 files changed, 115 insertions(+), 261 deletions(-) delete mode 100644 synapse/storage/message.py delete mode 100644 synapse/storage/roomdata.py (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3c27428c08..4fcef45e93 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -46,50 +46,83 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, self.event_factory = hs.get_event_factory() self.hs = hs + @defer.inlineCallbacks def persist_event(self, event): - if event.type == MessageEvent.TYPE: - return self.store_message( - user_id=event.user_id, - room_id=event.room_id, - msg_id=event.msg_id, - content=json.dumps(event.content) - ) - elif event.type == RoomMemberEvent.TYPE: - return self.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=event.content["membership"] - ) + if event.type == RoomMemberEvent.TYPE: + yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: - return self.store_feedback( - room_id=event.room_id, - msg_id=event.msg_id, - msg_sender_id=event.msg_sender_id, - fb_sender_id=event.user_id, - fb_type=event.feedback_type, - content=json.dumps(event.content) - ) - elif event.type == RoomTopicEvent.TYPE: - return self.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: - if "visibility" in event.content: - visibility = event.content["visibility"] - return self.store_room_config( - room_id=event.room_id, - visibility=visibility - ) - + yield self._store_room_config(event) + + self._store_event(event) + + @defer.inlineCallbacks + def get_event(self, event_id): + events_dict = yield self._simple_select_one( + "events", + {"event_id": event_id}, + [ + "event_id", + "type", + "sender", + "room_id", + "content", + "unrecognized_keys" + ], + ) + + event = self._parse_event_from_row(events_dict) + defer.returnValue(event) + + @defer.inlineCallbacks + def _store_event(self, event): + vals = { + "event_id": event.event_id, + "event_type", event.type, + "sender": event.user_id, + "room_id": event.room_id, + "content": event.content, + } + + unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} + val["unrecognized_keys"] = unrec + + yield self._simple_insert("events", vals) + + if hasattr(event, "state_key"): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "event_type": event.event_type, + "state_key": event.state_key, + } + + if hasattr(event, "prev_state"): + vals["prev_state"] = event.prev_state + + yield self._simple_insert("state_events", vals) + + # TODO (erikj): We also need to update the current state table? + + @defer.inlineCallbacks + def get_current_state(room_id, event_type=None, state_key="") + sql = ( + "SELECT e.* FROM events as e" + "INNER JOIN current_state as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) + + if event_type: + sql += " s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) else: - raise NotImplementedError( - "Don't know how to persist type=%s" % event.type - ) + args = (room_id, ) + + results = yield self._execute_query(sql, *args) + + defer.returnValue( def schema_path(schema): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 65f691ead4..03537b7e3b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from twisted.internet import defer @@ -20,6 +19,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import json + logger = logging.getLogger(__name__) @@ -28,6 +29,7 @@ class SQLBaseStore(object): def __init__(self, hs): self._db_pool = hs.get_db_pool() + self.event_factory = hs.get_event_factory() def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. @@ -63,6 +65,9 @@ class SQLBaseStore(object): return decoder(cursor) return self._db_pool.runInteraction(interaction) + def _execut_query(self, query, *args): + return self._execute(self.cursor_to_dict, *args) + # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -279,6 +284,16 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) + def _parse_event_from_row(self, row_dict): + d = copy.deepcopy({k: v for k, v in row.items() if v}) + d.update(json.loads(row["unrecognized_keys"])) + del d["unrecognized_keys"] + + return self.event_factory.create_event( + etype=d["type"], + **d + ) + class Table(object): """ A base class used to store information about a particular table. diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index 9bd562c762..fc93f92e1d 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -22,54 +22,27 @@ import json class FeedbackStore(SQLBaseStore): - def store_feedback(self, room_id, msg_id, msg_sender_id, - fb_sender_id, fb_type, content): - return self._simple_insert(FeedbackTable.table_name, dict( - room_id=room_id, - msg_id=msg_id, - msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, - fb_type=fb_type, - content=content, - )) - - def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None, - fb_sender_id=None, fb_type=None): - query = FeedbackTable.select_statement( - "msg_sender_id = ? AND room_id = ? AND msg_id = ? " + - "AND fb_sender_id = ? AND feedback_type = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - FeedbackTable.decode_single_result, - query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type, + def _store_feedback(self, event): + return self._simple_insert("feedback", { + "event_id": event.event_id, + "feedback_type": event.feedback_type, + "room_id": event.room_id, + "target_event_id": event.target_event, + }) + + @defer.inlineCallback + def get_feedback_for_event(self, event_id): + sql = ( + "SELECT events.* FROM events INNER JOIN feedback " + "ON events.event_id = feedback.event_id " + "WHERE feedback.target_event_id = ? " ) - def get_max_feedback_id(self): - return self._simple_max_id(FeedbackTable.table_name) - - -class FeedbackTable(Table): - table_name = "feedback" + rows = yield self._execute_query(sql, event_id) - fields = [ - "id", - "content", - "feedback_type", - "fb_sender_id", - "msg_id", - "room_id", - "msg_sender_id" - ] - - class EntryType(collections.namedtuple("FeedbackEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=FeedbackEvent.TYPE, - room_id=self.room_id, - msg_id=self.msg_id, - msg_sender_id=self.msg_sender_id, - user_id=self.fb_sender_id, - feedback_type=self.feedback_type, - content=json.loads(self.content), - ) + defer.returnValue( + [ + self._parse_event_from_row(r) + for r in rows + ] + ) diff --git a/synapse/storage/message.py b/synapse/storage/message.py deleted file mode 100644 index 7bb69c1384..0000000000 --- a/synapse/storage/message.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table -from synapse.api.events.room import MessageEvent - -import collections -import json - - -class MessageStore(SQLBaseStore): - - def get_message(self, user_id, room_id, msg_id): - """Get a message from the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - """ - query = MessagesTable.select_statement( - "user_id = ? AND room_id = ? AND msg_id = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - MessagesTable.decode_single_result, - query, user_id, room_id, msg_id, - ) - - def store_message(self, user_id, room_id, msg_id, content): - """Store a message in the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - content (str): The content of the message (JSON) - """ - return self._simple_insert(MessagesTable.table_name, dict( - user_id=user_id, - room_id=room_id, - msg_id=msg_id, - content=content, - )) - - def get_max_message_id(self): - return self._simple_max_id(MessagesTable.table_name) - - -class MessagesTable(Table): - table_name = "messages" - - fields = [ - "id", - "user_id", - "room_id", - "msg_id", - "content" - ] - - class EntryType(collections.namedtuple("MessageEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=MessageEvent.TYPE, - room_id=self.room_id, - user_id=self.user_id, - msg_id=self.msg_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py deleted file mode 100644 index cc04d1ba14..0000000000 --- a/synapse/storage/roomdata.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table - -import collections -import json - - -class RoomDataStore(SQLBaseStore): - - """Provides various CRUD operations for Room Events. """ - - def get_room_data(self, room_id, etype, state_key=""): - """Retrieve the data stored under this type and state_key. - - Args: - room_id (str) - etype (str) - state_key (str) - Returns: - namedtuple: Or None if nothing exists at this path. - """ - query = RoomDataTable.select_statement( - "room_id = ? AND type = ? AND state_key = ? " - "ORDER BY id DESC LIMIT 1" - ) - return self._execute( - RoomDataTable.decode_single_result, - query, room_id, etype, state_key, - ) - - def store_room_data(self, room_id, etype, state_key="", content=None): - """Stores room specific data. - - Args: - room_id (str) - etype (str) - state_key (str) - data (str)- The data to store for this path in JSON. - Returns: - The store ID for this data. - """ - return self._simple_insert(RoomDataTable.table_name, dict( - etype=etype, - state_key=state_key, - room_id=room_id, - content=content, - )) - - def get_max_room_data_id(self): - return self._simple_max_id(RoomDataTable.table_name) - - -class RoomDataTable(Table): - table_name = "room_data" - - fields = [ - "id", - "room_id", - "type", - "state_key", - "content" - ] - - class EntryType(collections.namedtuple("RoomDataEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=self.type, - room_id=self.room_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index e09ff6b64b..ad9770244d 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -16,8 +16,8 @@ CREATE TABLE IF NOT EXISTS events( ordering INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL, - event_type TEXT NOT NULL, - sender TEXT, + type TEXT NOT NULL, +-- sender TEXT, room_id TEXT, content TEXT, unrecognized_keys TEXT @@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS events( CREATE TABLE IF NOT EXISTS state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, - event_type TEXT NOT NULL, + type TEXT NOT NULL, state_key TEXT NOT NULL, prev_state TEXT ); @@ -47,9 +47,8 @@ CREATE TABLE IF NOT EXISTS room_memberships( CREATE TABLE IF NOT EXISTS feedback( event_id TEXT NOT NULL, feedback_type TEXT, - fb_sender_id TEXT, - room_id TEXT, - content TEXT + target_event_id TEXT,sudo + room_id TEXT ); CREATE TABLE IF NOT EXISTS rooms( -- cgit 1.4.1 From beaf4384d906f7fb41d80619d0a46a3f593795db Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Aug 2014 17:43:34 +0100 Subject: Make feedback table also store sender. --- synapse/storage/feedback.py | 1 + synapse/storage/schema/im.sql | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index fc93f92e1d..b9e792c120 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -28,6 +28,7 @@ class FeedbackStore(SQLBaseStore): "feedback_type": event.feedback_type, "room_id": event.room_id, "target_event_id": event.target_event, + "sender": event.user_id, }) @defer.inlineCallback diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index ad9770244d..37b7c6c74f 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -47,7 +47,8 @@ CREATE TABLE IF NOT EXISTS room_memberships( CREATE TABLE IF NOT EXISTS feedback( event_id TEXT NOT NULL, feedback_type TEXT, - target_event_id TEXT,sudo + target_event_id TEXT, + sender TEXT, room_id TEXT ); -- cgit 1.4.1 From 6d6a1c3454ae787e3878202a8e41341ddcf7bee0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 14:30:25 +0100 Subject: Actually encode dicts as json in the DB --- synapse/storage/__init__.py | 4 ++-- synapse/storage/_base.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d38d613450..cd9acdc447 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -81,11 +81,11 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, "event_type", event.type, "sender": event.user_id, "room_id": event.room_id, - "content": event.content, + "content": json.dumps(event.content), } unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} - val["unrecognized_keys"] = unrec + val["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 489b6bd171..5cb26ad6db 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -288,7 +288,8 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row.items() if v}) - d.update(json.loads(row["unrecognized_keys"])) + d.update(json.loads(json.loads(row["unrecognized_keys"]))) + d["content"] = json.loads(d["content"}) del d["unrecognized_keys"] return self.event_factory.create_event( -- cgit 1.4.1 From 937c175029be9bcf2ba26c27ecb879292e8c555a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 16:02:10 +0100 Subject: Fix up RoomMemberStore to work with the new schema. --- synapse/storage/_base.py | 6 +- synapse/storage/roommember.py | 164 ++++++++++++++++++------------------------ synapse/storage/schema/im.sql | 6 +- 3 files changed, 79 insertions(+), 97 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5cb26ad6db..befeb55b25 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -64,7 +64,11 @@ class SQLBaseStore(object): def interaction(txn): cursor = txn.execute(query, args) - return decoder(cursor) + if decoder: + return decoder(cursor) + else: + return cursor + return self._db_pool.runInteraction(interaction) def _execut_query(self, query, *args): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ef73be4af4..60296380e6 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -31,6 +31,38 @@ logger = logging.getLogger(__name__) class RoomMemberStore(SQLBaseStore): + @defer.inlineCallbacks + def _store_room_member(self, event): + """Store a room member in the database. + """ + domain = self.hs.parse_userid(event.target_user_id).domain + + yield self._simple_insert( + "room_memberships", + { + "event_id": event.event_id, + "user_id": event.target_user_id, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + } + ) + + # Update room hosts table + if event.membership == Membership.JOIN: + sql = ( + "INSERT OR IGNORE INTO room_hosts (room_id, host) " + "VALUES (?, ?)" + ) + yield self._execute(None, sql, room_id, domain) + else: + sql = ( + "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" + ) + + yield self._execute(None, sql, room_id, domain) + + def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -38,36 +70,13 @@ class RoomMemberStore(SQLBaseStore): user_id (str): The member's user ID. room_id (str): The room the member is in. Returns: - namedtuple: The room member from the database, or None if this - member does not exist. + Deferred: Results in a MembershipEvent or None. """ - query = RoomMemberTable.select_statement( - "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1") - return self._execute( - RoomMemberTable.decode_single_result, - query, room_id, user_id, - ) - - def store_room_member(self, user_id, sender, room_id, membership, content): - """Store a room member in the database. - - Args: - user_id (str): The member's user ID. - room_id (str): The room in relation to the member. - membership (synapse.api.constants.Membership): The new membership - state. - content (dict): The content of the membership (JSON). - """ - content_json = json.dumps(content) - return self._simple_insert(RoomMemberTable.table_name, dict( - user_id=user_id, - sender=sender, + return self._get_members_by_dict( room_id=room_id, - membership=membership, - content=content_json, - )) + user_id=user_id + ) - @defer.inlineCallbacks def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -79,17 +88,12 @@ class RoomMemberStore(SQLBaseStore): Returns: list of namedtuples representing the members in this room. """ - query = RoomMemberTable.select_statement( - "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name - + " WHERE room_id = ? GROUP BY user_id)" - ) - res = yield self._execute( - RoomMemberTable.decode_results, query, room_id, - ) - # strip memberships which don't match + + where = {"room_id": room_id} if membership: - res = [entry for entry in res if entry.membership == membership] - defer.returnValue(res) + where["membership"] = membership + + return self._get_members_by_dict(**membership) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -106,67 +110,37 @@ class RoomMemberStore(SQLBaseStore): return defer.succeed(None) args = [user_id] - membership_placeholder = ["membership=?"] * len(membership_list) - where_membership = "(" + " OR ".join(membership_placeholder) + ")" - for membership in membership_list: - args.append(membership) - - query = ("SELECT room_id, membership FROM room_memberships" - + " WHERE user_id=? AND " + where_membership - + " GROUP BY room_id ORDER BY id DESC") - return self._execute( - self.cursor_to_dict, query, *args - ) + args.extend(membership_list) - @defer.inlineCallbacks - def get_joined_hosts_for_room(self, room_id): - query = RoomMemberTable.select_statement( - "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name - + " WHERE room_id = ? GROUP BY user_id)" + where_clause "user_id = ? AND (%s)" % ( + " OR ".join(["membership = ?" for _ in membership_list]), ) - res = yield self._execute( - RoomMemberTable.decode_results, query, room_id, - ) - - def host_from_user_id_string(user_id): - domain = UserID.from_string(entry.user_id, self.hs).domain - return domain - - # strip memberships which don't match - hosts = [ - host_from_user_id_string(entry.user_id) - for entry in res - if entry.membership == Membership.JOIN - ] + return self._get_members_query(where_clause, args) - logger.debug("Returning hosts: %s from results: %s", hosts, res) - - defer.returnValue(hosts) - - def get_max_room_member_id(self): - return self._simple_max_id(RoomMemberTable.table_name) - - -class RoomMemberTable(Table): - table_name = "room_memberships" - - fields = [ - "id", - "user_id", - "sender", - "room_id", - "membership", - "content" - ] + def get_joined_hosts_for_room(self, room_id): + return self._simple_select_onecol( + "room_hosts", + {"room_id": room_id}, + "host" + ) - class EntryType(collections.namedtuple("RoomMemberEntry", fields)): + def _get_members_by_dict(self, where_dict): + clause = " AND ".join("%s = ?" % k for k in where.keys()) + vals = where.values() + return self._get_members_query(clause, vals) - def as_event(self, event_factory): - return event_factory.create_event( - etype=RoomMemberEvent.TYPE, - room_id=self.room_id, - target_user_id=self.user_id, - user_id=self.sender, - content=json.loads(self.content), - ) + @defer.inlineCallbacks + def _get_members_query(self, where_clause, where_values): + sql = ( + "SELECT e.* FROM events as e " + "INNER JOIN room_memberships as m " + "ON e.event_id = m.event_id " + "INNER JOIN current_state as c " + "ON m.event_id = c.event_id " + "WHERE %s " + ) % (where_clause,) + + rows = yield self._execute_query(sql, where_values) + results = [self._parse_event_from_row(r) for r in rows] + defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 37b7c6c74f..7f564c8540 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -17,7 +17,6 @@ CREATE TABLE IF NOT EXISTS events( ordering INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL, type TEXT NOT NULL, --- sender TEXT, room_id TEXT, content TEXT, unrecognized_keys TEXT @@ -57,3 +56,8 @@ CREATE TABLE IF NOT EXISTS rooms( is_public INTEGER, creator TEXT ); + +CREATE TABLE IF NOT EXISTS room_hosts( + room_id TEXT NOT NULL, + host TEXT NOT NULL +); -- cgit 1.4.1 From 2529f2bc01781314ecdedd69e272c737ba1a71f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 16:58:51 +0100 Subject: Rename _execute_query --- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/feedback.py | 2 +- synapse/storage/roommember.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index cd9acdc447..75d93f7111 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -119,7 +119,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, else: args = (room_id, ) - results = yield self._execute_query(sql, *args) + results = yield self._execute_and_decode(sql, *args) defer.returnValue( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index befeb55b25..7fef8601e7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -71,7 +71,7 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(interaction) - def _execut_query(self, query, *args): + def _execute_and_decode(self, query, *args): return self._execute(self.cursor_to_dict, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index b9e792c120..dd5f3fbc10 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -39,7 +39,7 @@ class FeedbackStore(SQLBaseStore): "WHERE feedback.target_event_id = ? " ) - rows = yield self._execute_query(sql, event_id) + rows = yield self._execute_and_decode(sql, event_id) defer.returnValue( [ diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 60296380e6..c99cefbcfc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -141,6 +141,6 @@ class RoomMemberStore(SQLBaseStore): "WHERE %s " ) % (where_clause,) - rows = yield self._execute_query(sql, where_values) + rows = yield self._execute_and_decode(sql, where_values) results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) -- cgit 1.4.1 From 78b501eba68f93c91f53ddf179ebde32f511894f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 17:09:28 +0100 Subject: Fix typo --- synapse/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 75d93f7111..afdd75f46d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -121,7 +121,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, results = yield self._execute_and_decode(sql, *args) - defer.returnValue( + defer.returnValue([self._parse_event_from_row(r) for r in results]) def schema_path(schema): -- cgit 1.4.1 From 661c7117659118ed977f56a092525dbdae9dc67c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 17:34:37 +0100 Subject: Start fixing places that use the data store. --- synapse/handlers/room.py | 17 ++++------------- synapse/rest/room.py | 39 +++++++++++++++++++++------------------ synapse/storage/__init__.py | 8 +++----- synapse/storage/_base.py | 2 +- synapse/storage/feedback.py | 4 +++- synapse/storage/roommember.py | 2 +- 6 files changed, 33 insertions(+), 39 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index eae40765b3..a9ff2d93f1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -141,12 +141,7 @@ class MessageHandler(BaseHandler): yield self.state_handler.handle_new_event(event) # store in db - store_id = yield self.store.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + store_id = yield self.store.persist_event(event) event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id @@ -201,19 +196,15 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_room_data(room_id, event_type, state_key) + data = yield self.store.get_current_state(room_id, event_type, state_key) defer.returnValue(data) @defer.inlineCallbacks - def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, - user_id=None, fb_sender_id=None, fb_type=None): + def get_feedback(self, event_id): yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db - fb = yield self.store.get_feedback( - room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, fb_type=fb_type - ) + fb = yield self.store.get_feedback(event_id) if fb: defer.returnValue(fb) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 1fc0c996b8..3f153df8e3 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -285,25 +285,28 @@ class FeedbackRestServlet(RestServlet): feedback_type): user = yield (self.auth.get_user_by_req(request)) - if feedback_type not in Feedback.LIST: - raise SynapseError(400, "Bad feedback type.", - errcode=Codes.BAD_JSON) - - msg_handler = self.handlers.message_handler - feedback = yield msg_handler.get_feedback( - room_id=urllib.unquote(room_id), - msg_sender_id=msg_sender_id, - msg_id=msg_id, - user_id=user.to_string(), - fb_sender_id=fb_sender_id, - fb_type=feedback_type - ) - - if not feedback: - raise SynapseError(404, "Feedback not found.", - errcode=Codes.NOT_FOUND) + # TODO (erikj): Implement this? + raise NotImplementedError("Getting feedback is not supported") - defer.returnValue((200, json.loads(feedback.content))) +# if feedback_type not in Feedback.LIST: +# raise SynapseError(400, "Bad feedback type.", +# errcode=Codes.BAD_JSON) +# +# msg_handler = self.handlers.message_handler +# feedback = yield msg_handler.get_feedback( +# room_id=urllib.unquote(room_id), +# msg_sender_id=msg_sender_id, +# msg_id=msg_id, +# user_id=user.to_string(), +# fb_sender_id=fb_sender_id, +# fb_type=feedback_type +# ) +# +# if not feedback: +# raise SynapseError(404, "Feedback not found.", +# errcode=Codes.NOT_FOUND) +# +# defer.returnValue((200, json.loads(feedback.content))) @defer.inlineCallbacks def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index afdd75f46d..182b6ebadd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -21,13 +21,11 @@ from synapse.api.events.room import ( from .directory import DirectoryStore from .feedback import FeedbackStore -from .message import MessageStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore -from .roomdata import RoomDataStore from .stream import StreamStore from .pdu import StatePduStore, PduStore from .transactions import TransactionStore @@ -36,7 +34,7 @@ import json import os -class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, +class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, DirectoryStore): @@ -78,7 +76,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, def _store_event(self, event): vals = { "event_id": event.event_id, - "event_type", event.type, + "event_type": event.type, "sender": event.user_id, "room_id": event.room_id, "content": json.dumps(event.content), @@ -105,7 +103,7 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, # TODO (erikj): We also need to update the current state table? @defer.inlineCallbacks - def get_current_state(room_id, event_type=None, state_key="") + def get_current_state(room_id, event_type=None, state_key=""): sql = ( "SELECT e.* FROM events as e" "INNER JOIN current_state as c ON e.event_id = c.event_id " diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7fef8601e7..533f509709 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -293,7 +293,7 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row.items() if v}) d.update(json.loads(json.loads(row["unrecognized_keys"]))) - d["content"] = json.loads(d["content"}) + d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] return self.event_factory.create_event( diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index dd5f3fbc10..e60f98d1e1 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table from synapse.api.events.room import FeedbackEvent @@ -31,7 +33,7 @@ class FeedbackStore(SQLBaseStore): "sender": event.user_id, }) - @defer.inlineCallback + @defer.inlineCallbacks def get_feedback_for_event(self, event_id): sql = ( "SELECT events.* FROM events INNER JOIN feedback " diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c99cefbcfc..14c0152e8a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -112,7 +112,7 @@ class RoomMemberStore(SQLBaseStore): args = [user_id] args.extend(membership_list) - where_clause "user_id = ? AND (%s)" % ( + where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) -- cgit 1.4.1 From 7e681ad778b18f47fc7d6f410c6b1e87a1a99f20 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 18:01:39 +0100 Subject: Update StreamStore --- synapse/storage/stream.py | 281 ++++++---------------------------------------- 1 file changed, 36 insertions(+), 245 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1dedffac49..1456d216f0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -20,264 +20,55 @@ from .feedback import FeedbackTable from .roomdata import RoomDataTable from .roommember import RoomMemberTable +from synapse.api.constants import Membership + import json import logging -logger = logging.getLogger(__name__) - - -class StreamStore(SQLBaseStore): - - def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0, - with_feedback=False): - """Get all messages for this user between the given keys. - - Args: - user_id (str): The user who is requesting messages. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - room_id (str): Gets messages only for this room. Can be None, in - which case all room messages will be returned. - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - if with_feedback and room_id: # with fb MUST specify a room ID - return self._db_pool.runInteraction( - self._get_message_rows_with_feedback, - user_id, from_key, to_key, room_id, limit - ) - else: - return self._db_pool.runInteraction( - self._get_message_rows, - user_id, from_key, to_key, room_id, limit - ) - - def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the messages table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ("SELECT messages.* FROM messages WHERE ? IN" - + " (SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND messages.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, limit=limit - ) - - logger.debug("[SQL] %s : %s", query, query_args) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, MessagesTable, from_pkey) - - def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey, - room_id, limit): - # this col represents the compressed feedback JSON as per spec - compressed_feedback_col = ( - "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id" - + " || '\",\"feedback_type\":\"' || f.feedback_type" - + " || '\",\"content\":' || f.content || '}') || ']'" - ) - - global_msg_id_join = ("f.room_id = messages.room_id" - + " and f.msg_id = messages.msg_id" - + " and messages.user_id = f.msg_sender_id") - - select_query = ( - "SELECT messages.*, f.content AS fb_content, f.fb_sender_id" - + ", " + compressed_feedback_col + " AS compressed_fb" - + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join) - - current_membership_sub_query = ( - "(SELECT membership from room_memberships rm" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - where = (" WHERE ? IN " + current_membership_sub_query - + " AND messages.room_id=?") - - query = select_query + where - query_args = ["join", user_id, room_id] - - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, - limit=limit, group_by=" GROUP BY messages.id " - ) - - logger.debug("[SQL] %s : %s", query, query_args) - cursor = txn.execute(query, query_args) - # convert the result set into events - entries = self.cursor_to_dict(cursor) - events = [] - for entry in entries: - # TODO we should spec the cursor > event mapping somewhere else. - event = {} - straight_mappings = ["msg_id", "user_id", "room_id"] - for key in straight_mappings: - event[key] = entry[key] - event["content"] = json.loads(entry["content"]) - if entry["compressed_fb"]: - event["feedback"] = json.loads(entry["compressed_fb"]) - events.append(event) - - latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"] - - return (events, latest_pkey) - - def get_room_member_stream(self, user_id, from_key, to_key): - """Get all room membership events for this user between the given keys. - - Args: - user_id (str): The user who is requesting membership events. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - return self._db_pool.runInteraction( - self._get_room_member_rows, user_id, from_key, to_key - ) - - def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey): - # get all room membership events for rooms which the user is - # *currently* joined in on, or all invite events for this user. - current_membership_sub_query = ( - "(SELECT membership FROM room_memberships" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - query = ("SELECT rm.* FROM room_memberships rm " - # all membership events for rooms you've currently joined. - + " WHERE (? IN " + current_membership_sub_query - # all invite membership events for this user - + " OR rm.membership=? AND user_id=?)" - + " AND rm.id > ?") - query_args = ["join", user_id, "invite", user_id, from_pkey] - - if to_pkey != -1: - query += " AND rm.id < ?" - query_args.append(to_pkey) +logger = logging.getLogger(__name__) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomMemberTable, from_pkey) - def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0): - return self._db_pool.runInteraction( - self._get_feedback_rows, - user_id, from_key, to_key, room_id, limit - ) +MAX_STREAM_SIZE = 1000 - def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT feedback.* FROM feedback WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] +class StreamStore(SQLBaseStore): - if room_id: - query += " AND feedback.room_id=?" - query_args.append(room_id) + @defer.inlineCallbacks + def get_room_events_stream(self, user_id, from_key, to_key, room_id, + limit=0, with_feedback=False): - (query, query_args) = self._append_stream_operations( - "feedback", query, query_args, from_pkey, to_pkey, limit=limit + current_room_membership_sql = ( + "SELECT m.room_id FROM room_memberships as m " + "INNER JOIN current_state as c ON m.event_id = c.event_id " + "WHERE m.user_id = ?" ) - logger.debug("[SQL] %s : %s", query, query_args) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, FeedbackTable, from_pkey) - - def get_room_data_stream(self, user_id, from_key, to_key, room_id, - limit=0): - return self._db_pool.runInteraction( - self._get_room_data_rows, - user_id, from_key, to_key, room_id, limit + invites_sql = ( + "SELECT m.event_id FROM room_membershipas as m " + "INNER JOIN current_state as c ON m.event_id = c.event_id " + "WHERE m.user_id = ? AND m.membership = ?" ) - def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT room_data.* FROM room_data WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND room_data.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "room_data", query, query_args, from_pkey, to_pkey, limit=limit + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = 1000 + + sql = ( + "SELECT * FROM events as e WHERE " + "(room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s)) " + "ORDER BY ordering ASC LIMIT %(limit)d" + ) % { + "current": current_room_membership_sql, + "invites": invites_sql, + "limit": limit, + } + + rows = yield self._execute_and_decode( + sql, + user_id, user_id, Membership.INVITE ) - logger.debug("[SQL] %s : %s", query, query_args) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomDataTable, from_pkey) - - def _append_stream_operations(self, table_name, query, query_args, - from_pkey, to_pkey, limit=None, - group_by=""): - LATEST_ROW = -1 - order_by = "" - if to_pkey > from_pkey: - if from_pkey != LATEST_ROW: - # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9 - query += (" AND %s.id > ? AND %s.id < ?" % - (table_name, table_name)) - query_args.append(from_pkey) - query_args.append(to_pkey) - else: - # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC - query += " AND %s.id > ? " % table_name - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - elif from_pkey > to_pkey: - if to_pkey != LATEST_ROW: - # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC - query += (" AND %s.id > ? AND %s.id < ? " % - (table_name, table_name)) - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - query_args.append(from_pkey) - else: - # from=5 to=-1 >> from 5 to now >> id>5 - query += " AND %s.id > ?" % table_name - query_args.append(from_pkey) - - query += group_by + order_by - - if limit and limit > 0: - query += " LIMIT ?" - query_args.append(str(limit)) - - return (query, query_args) - - def _as_events(self, cursor, table, from_pkey): - data_entries = table.decode_results(cursor) - last_pkey = from_pkey - if data_entries: - last_pkey = data_entries[-1].id - - events = [ - entry.as_event(self.event_factory).get_dict() - for entry in data_entries - ] - - return (events, last_pkey) + defer.returnValue([self._parse_event_from_row(r) for r in results]) -- cgit 1.4.1 From 2c46bb620828efaebdbae37e5212a28b505ee72d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Aug 2014 18:40:50 +0100 Subject: Fix up typos and correct sql queries --- synapse/handlers/room.py | 10 ++-------- synapse/storage/__init__.py | 20 ++++++++++---------- synapse/storage/_base.py | 9 +++++---- synapse/storage/roommember.py | 26 +++++++++++++------------- synapse/storage/schema/im.sql | 4 ++-- synapse/storage/stream.py | 11 ++++------- 6 files changed, 36 insertions(+), 44 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a9ff2d93f1..9b55206e47 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -201,7 +201,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_feedback(self, event_id): - yield self.auth.check_joined_room(room_id, user_id) + # yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db fb = yield self.store.get_feedback(event_id) @@ -690,13 +690,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): # store membership - store_id = yield self.store.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=membership - ) + store_id = yield self.store.persist_event(event) # Send a PDU to all hosts who have joined the room. destinations = yield self.store.get_joined_hosts_for_room( diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 182b6ebadd..f41c21dcd2 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, @@ -52,7 +53,7 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomConfigEvent.TYPE: yield self._store_room_config(event) - self._store_event(event) + yield self._store_event(event) @defer.inlineCallbacks def get_event(self, event_id): @@ -76,14 +77,13 @@ class DataStore(RoomMemberStore, RoomStore, def _store_event(self, event): vals = { "event_id": event.event_id, - "event_type": event.type, - "sender": event.user_id, + "type": event.type, "room_id": event.room_id, "content": json.dumps(event.content), } - unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()} - val["unrecognized_keys"] = json.dumps(unrec) + unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + vals["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) @@ -91,7 +91,7 @@ class DataStore(RoomMemberStore, RoomStore, vals = { "event_id": event.event_id, "room_id": event.room_id, - "event_type": event.event_type, + "type": event.type, "state_key": event.state_key, } @@ -103,16 +103,16 @@ class DataStore(RoomMemberStore, RoomStore, # TODO (erikj): We also need to update the current state table? @defer.inlineCallbacks - def get_current_state(room_id, event_type=None, state_key=""): + def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( - "SELECT e.* FROM events as e" - "INNER JOIN current_state as c ON e.event_id = c.event_id " + "SELECT e.* FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " "INNER JOIN state_events as s ON e.event_id = s.event_id " "WHERE c.room_id = ? " ) if event_type: - sql += " s.type = ? AND s.state_key = ? " + sql += " AND s.type = ? AND s.state_key = ? " args = (room_id, event_type, state_key) else: args = (room_id, ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 533f509709..c8ec63f30a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import copy import json @@ -59,7 +60,7 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ + "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None ) def interaction(txn): @@ -72,7 +73,7 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(interaction) def _execute_and_decode(self, query, *args): - return self._execute(self.cursor_to_dict, *args) + return self._execute(self.cursor_to_dict, query, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -291,8 +292,8 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) def _parse_event_from_row(self, row_dict): - d = copy.deepcopy({k: v for k, v in row.items() if v}) - d.update(json.loads(json.loads(row["unrecognized_keys"]))) + d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 14c0152e8a..8c4b04f190 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -54,13 +54,13 @@ class RoomMemberStore(SQLBaseStore): "INSERT OR IGNORE INTO room_hosts (room_id, host) " "VALUES (?, ?)" ) - yield self._execute(None, sql, room_id, domain) + yield self._execute(None, sql, event.room_id, domain) else: sql = ( "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" ) - yield self._execute(None, sql, room_id, domain) + yield self._execute(None, sql, event.room_id, domain) def get_room_member(self, user_id, room_id): @@ -72,10 +72,10 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - return self._get_members_by_dict( - room_id=room_id, - user_id=user_id - ) + return self._get_members_by_dict({ + "e.room_id": room_id, + "m.user_id": user_id, + }) def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -89,11 +89,11 @@ class RoomMemberStore(SQLBaseStore): list of namedtuples representing the members in this room. """ - where = {"room_id": room_id} + where = {"m.room_id": room_id} if membership: - where["membership"] = membership + where["m.membership"] = membership - return self._get_members_by_dict(**membership) + return self._get_members_by_dict(where) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -126,8 +126,8 @@ class RoomMemberStore(SQLBaseStore): ) def _get_members_by_dict(self, where_dict): - clause = " AND ".join("%s = ?" % k for k in where.keys()) - vals = where.values() + clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) + vals = where_dict.values() return self._get_members_query(clause, vals) @defer.inlineCallbacks @@ -136,11 +136,11 @@ class RoomMemberStore(SQLBaseStore): "SELECT e.* FROM events as e " "INNER JOIN room_memberships as m " "ON e.event_id = m.event_id " - "INNER JOIN current_state as c " + "INNER JOIN current_state_events as c " "ON m.event_id = c.event_id " "WHERE %s " ) % (where_clause,) - rows = yield self._execute_and_decode(sql, where_values) + rows = yield self._execute_and_decode(sql, *where_values) results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 7f564c8540..85c0c7119c 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -30,9 +30,9 @@ CREATE TABLE IF NOT EXISTS state_events( prev_state TEXT ); -CREATE TABLE IF NOT EXISTS current_state( +CREATE TABLE IF NOT EXISTS current_state_events( event_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1456d216f0..9937239c22 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,12 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable from synapse.api.constants import Membership @@ -40,13 +37,13 @@ class StreamStore(SQLBaseStore): current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state as c ON m.event_id = c.event_id " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ?" ) invites_sql = ( "SELECT m.event_id FROM room_membershipas as m " - "INNER JOIN current_state as c ON m.event_id = c.event_id " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -71,4 +68,4 @@ class StreamStore(SQLBaseStore): user_id, user_id, Membership.INVITE ) - defer.returnValue([self._parse_event_from_row(r) for r in results]) + defer.returnValue([self._parse_event_from_row(r) for r in rows]) -- cgit 1.4.1 From 5002efa31bb57a92b87b9d7319641d9b5a2a6047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 10:26:35 +0100 Subject: Reimplement the get public rooms api to work with new DB schema --- synapse/api/events/factory.py | 3 +- synapse/api/events/room.py | 23 +++++++++++ synapse/handlers/room.py | 2 +- synapse/storage/__init__.py | 6 ++- synapse/storage/_base.py | 2 +- synapse/storage/room.py | 90 +++++++++++++++++++++++++++---------------- synapse/storage/schema/im.sql | 12 ++++++ 7 files changed, 101 insertions(+), 37 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 12aa04fc6e..23d2b0401c 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -15,7 +15,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, - InviteJoinEvent, RoomConfigEvent + InviteJoinEvent, RoomConfigEvent, RoomNameEvent, ) from synapse.util.stringutils import random_string @@ -25,6 +25,7 @@ class EventFactory(object): _event_classes = [ RoomTopicEvent, + RoomNameEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index f3df849af2..8136d495d5 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -19,14 +19,37 @@ from . import SynapseEvent class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" + internal_keys = SynapseEvent.internal_keys + [ + "topic", + ] + def __init__(self, **kwargs): kwargs["state_key"] = "" + if "topic" in kwargs["content"]: + kwargs["topic"] = kwargs["content"]["topic"] super(RoomTopicEvent, self).__init__(**kwargs) def get_content_template(self): return {"topic": u"string"} +class RoomNameEvent(SynapseEvent): + TYPE = "m.room.name" + + internal_keys = SynapseEvent.internal_keys + [ + "name", + ] + + def __init__(self, **kwargs): + kwargs["state_key"] = "" + if "name" in kwargs["content"]: + kwargs["name"] = kwargs["content"]["name"] + super(RoomNameEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"name": u"string"} + + class RoomMemberEvent(SynapseEvent): TYPE = "m.room.member" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9b55206e47..5c1b59dbc9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -790,5 +790,5 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): - chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + chunk = yield self.store.get_rooms(is_public=True) defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f41c21dcd2..f62cee3c39 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, - RoomConfigEvent + RoomConfigEvent, RoomNameEvent, ) from .directory import DirectoryStore @@ -52,6 +52,10 @@ class DataStore(RoomMemberStore, RoomStore, yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: yield self._store_room_config(event) + elif event.type == RoomNameEvent.TYPE: + yield self._store_room_name(event) + elif event.type == RoomTopicEvent.TYPE: + yield self._store_room_topic(event) yield self._store_event(event) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c8ec63f30a..c26e9a0f98 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -68,7 +68,7 @@ class SQLBaseStore(object): if decoder: return decoder(cursor) else: - return cursor + return cursor.fetchall() return self._db_pool.runInteraction(interaction) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a97162831b..8f44b67d8c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -76,49 +76,73 @@ class RoomStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_rooms(self, is_public, with_topics): + def get_rooms(self, is_public): """Retrieve a list of all public rooms. Args: is_public (bool): True if the rooms returned should be public. - with_topics (bool): True to include the current topic for the room - in the response. Returns: - A list of room dicts containing at least a "room_id" key, and a - "topic" key if one is set and with_topic=True. + A list of room dicts containing at least a "room_id" key, a + "topic" key if one is set, and a "name" key if one is set """ - room_data_type = RoomTopicEvent.TYPE - public = 1 if is_public else 0 - - latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE " - + "room_data.type = ? GROUP BY room_id") - - query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms " - + "LEFT JOIN " - + "room_aliases ON room_aliases.room_id = rooms.room_id " - + "LEFT JOIN " - + "room_data ON rooms.room_id = room_data.room_id WHERE " - + "(room_data.id IN (" + latest_topic + ") " - + "OR room_data.id IS NULL) AND rooms.is_public = ?") - - res = yield self._execute( - self.cursor_to_dict, query, room_data_type, public + + topic_subquery = ( + "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "INNER JOIN current_state_events as c " + "ON c.event_id = topics.event_id " ) - # return only the keys the specification expects - ret_keys = ["room_id", "topic", "room_alias"] + name_subquery = ( + "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "INNER JOIN current_state_events as c " + "ON c.event_id = room_names.event_id " + ) - # extract topic from the json (icky) FIXME - for i, room_row in enumerate(res): - try: - content_json = json.loads(room_row["content"]) - room_row["topic"] = content_json["topic"] - except: - pass # no topic set - # filter the dict based on ret_keys - res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys} + sql = ( + "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " + "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " + "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " + "WHERE r.is_public = ? " + "GROUP BY r.room_id " + ) % { + "topic": topic_subquery, + "name": name_subquery, + } + + rows = yield self._execute(None, sql, is_public) + + ret = [ + { + "room_id": r[0], + "name": r[1], + "topic": r[2], + "aliases": r[3].split(","), + } + for r in rows + ] + + defer.returnValue(ret) + + def _store_room_topic(self, event): + return self._simple_insert( + "topics", + { + "event_id": event.event_id, + "room_id": event.room_id, + "topic": event.topic, + } + ) - defer.returnValue(res) + def _store_room_name(self, event): + return self._simple_insert( + "room_names", + { + "event_id": event.event_id, + "room_id": event.room_id, + "name": event.name, + } + ) class RoomsTable(Table): diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 85c0c7119c..9a0f2145d5 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -51,6 +51,18 @@ CREATE TABLE IF NOT EXISTS feedback( room_id TEXT ); +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + topic TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL +); + CREATE TABLE IF NOT EXISTS rooms( room_id TEXT PRIMARY KEY NOT NULL, is_public INTEGER, -- cgit 1.4.1 From 114984a2361ee41005a769f6dc127c470ee08aee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 13:58:28 +0100 Subject: Start chagning the events stream to work with the new DB schema --- synapse/api/streams/event.py | 77 +++-------------------------------------- synapse/handlers/events.py | 8 ++--- synapse/handlers/room.py | 79 +++++++++++++++++++++++-------------------- synapse/storage/__init__.py | 10 +++++- synapse/storage/_base.py | 2 +- synapse/storage/roommember.py | 8 ++++- synapse/storage/schema/im.sql | 5 ++- synapse/storage/stream.py | 31 +++++++++++++++++ 8 files changed, 102 insertions(+), 118 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 4b6d739e54..427363cad4 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -28,17 +28,17 @@ import logging logger = logging.getLogger(__name__) -class MessagesStreamData(StreamData): - EVENT_TYPE = MessageEvent.TYPE +class EventsStreamData(StreamData): + EVENT_TYPE = "EventsStream" def __init__(self, hs, room_id=None, feedback=False): - super(MessagesStreamData, self).__init__(hs) + super(EventsStreamData, self).__init__(hs) self.room_id = room_id self.with_feedback = feedback @defer.inlineCallbacks def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_message_stream( + data, latest_ver = yield self.store.get_room_events_stream( user_id=user_id, from_key=from_key, to_key=to_key, @@ -50,74 +50,7 @@ class MessagesStreamData(StreamData): @defer.inlineCallbacks def max_token(self): - val = yield self.store.get_max_message_id() - defer.returnValue(val) - - -class RoomMemberStreamData(StreamData): - EVENT_TYPE = RoomMemberEvent.TYPE - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_member_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key - ) - - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_member_id() - defer.returnValue(val) - - -class FeedbackStreamData(StreamData): - EVENT_TYPE = FeedbackEvent.TYPE - - def __init__(self, hs, room_id=None): - super(FeedbackStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_feedback_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_feedback_id() - defer.returnValue(val) - - -class RoomDataStreamData(StreamData): - EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types - - def __init__(self, hs, room_id=None): - super(RoomDataStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_data_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_data_id() + val = yield self.store.get_room_events_max_id() defer.returnValue(val) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3af7d824a2..6bb797caf2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,8 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.streams.event import ( - EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, - RoomDataStreamData + EventStream, EventsStreamData ) from synapse.handlers.presence import PresenceStreamData @@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData class EventStreamHandler(BaseHandler): stream_data_classes = [ - MessagesStreamData, - RoomMemberStreamData, - FeedbackStreamData, - RoomDataStreamData, + EventsStreamData, PresenceStreamData, ] diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 432d13982a..3451250008 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.api.streams.event import EventStream, EventsStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -97,30 +97,30 @@ class MessageHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) yield self.hs.get_federation().handle_new_event(event) - - @defer.inlineCallbacks - def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False): - """Get messages in a room. - - Args: - user_id (str): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages - Returns: - dict: Pagination API results - """ - yield self.auth.check_joined_room(room_id, user_id) - - data_source = [MessagesStreamData(self.hs, room_id=room_id, - feedback=feedback)] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) - +# +# @defer.inlineCallbacks +# def get_messages(self, user_id=None, room_id=None, pagin_config=None, +# feedback=False): +# """Get messages in a room. +# +# Args: +# user_id (str): The user requesting messages. +# room_id (str): The room they want messages from. +# pagin_config (synapse.api.streams.PaginationConfig): The pagination +# config rules to apply, if any. +# feedback (bool): True to get compressed feedback with the messages +# Returns: +# dict: Pagination API results +# """ +# yield self.auth.check_joined_room(room_id, user_id) +# +# data_source = [MessagesStreamData(self.hs, room_id=room_id, +# feedback=feedback)] +# event_stream = EventStream(user_id, data_source) +# pagin_config = yield event_stream.fix_tokens(pagin_config) +# data_chunk = yield event_stream.get_chunk(config=pagin_config) +# defer.returnValue(data_chunk) +# @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): """ Stores data for a room. @@ -251,20 +251,27 @@ class MessageHandler(BaseHandler): user_id=user_id, membership_list=[Membership.INVITE, Membership.JOIN] ) - for room_info in room_list: - if room_info["membership"] != Membership.JOIN: + + ret = [] + + for event in room_list: + d = event.get_dict() + ret.append(d) + + if event.membership != Membership.JOIN: continue try: - event_chunk = yield self.get_messages( - user_id=user_id, - pagin_config=pagin_config, - feedback=feedback, - room_id=room_info["room_id"] + messages = yield self.store.get_recent_events_for_room( + event.room_id, + limit=50, ) - room_info["messages"] = event_chunk + d["messages"] = [m.get_dict() for m in messages] except: pass - defer.returnValue(room_list) + + logger.debug("snapshot_all_rooms returning: %s", ret) + + defer.returnValue(ret) class RoomCreationHandler(BaseHandler): @@ -442,7 +449,7 @@ class RoomMemberHandler(BaseHandler): member_list = yield self.store.get_room_members(room_id=room_id) event_list = [ - entry.as_event(self.event_factory).get_dict() + entry.get_dict() for entry in member_list ] chunk_data = { @@ -685,7 +692,7 @@ class RoomMemberHandler(BaseHandler): user_id=user.to_string(), membership_list=membership_list ) - defer.returnValue([r["room_id"] for r in rooms]) + defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f62cee3c39..46b9dbcbbf 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -104,7 +104,15 @@ class DataStore(RoomMemberStore, RoomStore, yield self._simple_insert("state_events", vals) - # TODO (erikj): We also need to update the current state table? + yield self._simple_insert( + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c26e9a0f98..413838f798 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -293,7 +293,7 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) - d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) + d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c4b04f190..a0620677b9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -63,6 +63,7 @@ class RoomMemberStore(SQLBaseStore): yield self._execute(None, sql, event.room_id, domain) + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -72,11 +73,13 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - return self._get_members_by_dict({ + rows = yield self._get_members_by_dict({ "e.room_id": room_id, "m.user_id": user_id, }) + defer.returnValue(rows[0] if rows else None) + def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -142,5 +145,8 @@ class RoomMemberStore(SQLBaseStore): ) % (where_clause,) rows = yield self._execute_and_decode(sql, *where_values) + + logger.debug("_get_members_query Got rows %s", rows) + results = [self._parse_event_from_row(r) for r in rows] defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 9a0f2145d5..2452890ea4 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -32,7 +32,10 @@ CREATE TABLE IF NOT EXISTS state_events( CREATE TABLE IF NOT EXISTS current_state_events( event_id TEXT NOT NULL, - room_id TEXT NOT NULL + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 9937239c22..c5c3770a40 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -34,6 +34,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): + # TODO (erikj): Handle compressed feedback current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " @@ -69,3 +70,33 @@ class StreamStore(SQLBaseStore): ) defer.returnValue([self._parse_event_from_row(r) for r in rows]) + + @defer.inlineCallbacks + def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + # TODO (erikj): Handle compressed feedback + + sql = ( + "SELECT * FROM events WHERE room_id = ? " + "ORDER BY ordering DESC LIMIT ? " + ) + + rows = yield self._execute_and_decode( + sql, + room_id, limit + ) + + rows.reverse() # As we selected with reverse ordering + + defer.returnValue([self._parse_event_from_row(r) for r in rows]) + + @defer.inlineCallbacks + def get_room_events_max_id(self): + res = yield self._execute_and_decode( + "SELECT MAX(ordering) as m FROM events" + ) + + if not res: + defer.returnValue(0) + return + + defer.returnValue(res[0]["m"]) -- cgit 1.4.1 From 01f089d9fbb9b89fa143ac44e51529fa8ed7ec12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 15:28:54 +0100 Subject: Correctly return new token when returning events. Serialize events correctly. --- synapse/api/notifier.py | 3 ++- synapse/api/streams/event.py | 2 +- synapse/handlers/room.py | 5 ++++- synapse/storage/__init__.py | 6 +++++- synapse/storage/stream.py | 18 +++++++++++++----- 5 files changed, 25 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py index 65b5a4ebb3..9f622df6bb 100644 --- a/synapse/api/notifier.py +++ b/synapse/api/notifier.py @@ -15,6 +15,7 @@ from synapse.api.constants import Membership from synapse.api.events.room import RoomMemberEvent +from synapse.api.streams.event import EventsStreamData from twisted.internet import defer from twisted.internet import reactor @@ -66,7 +67,7 @@ class Notifier(object): self._notify_and_callback( user_id=user_id, event_data=event.get_dict(), - stream_type=event.type, + stream_type=EventsStreamData.EVENT_TYPE, store_id=store_id) def on_new_user_event(self, user_id, event_data, stream_type, store_id): diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 427363cad4..895a96b5b9 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -160,7 +160,7 @@ class EventStream(PaginationStream): self.user_id, from_pkey, to_pkey, limit ) - chunk += event_chunk + chunk += [e.get_dict() for e in event_chunk] next_ver.append(str(max_pkey)) defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3451250008..9261984b7e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -255,7 +255,10 @@ class MessageHandler(BaseHandler): ret = [] for event in room_list: - d = event.get_dict() + d = { + "room_id": event.room_id, + "membership": event.membership, + } ret.append(d) if event.membership != Membership.JOIN: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 46b9dbcbbf..750e86040e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -57,7 +57,8 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - yield self._store_event(event) + ret = yield self._store_event(event) + defer.returnValue(ret) @defer.inlineCallbacks def get_event(self, event_id): @@ -114,6 +115,9 @@ class DataStore(RoomMemberStore, RoomStore, } ) + latest = yield self.get_room_events_max_id() + defer.returnValue(latest) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c5c3770a40..1300aee8b0 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ class StreamStore(SQLBaseStore): ) invites_sql = ( - "SELECT m.event_id FROM room_membershipas as m " + "SELECT m.event_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " "WHERE m.user_id = ? AND m.membership = ?" ) @@ -55,8 +55,9 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events as e WHERE " - "(room_id IN (%(current)s)) OR " - "(event_id IN (%(invites)s)) " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + " AND e.ordering > ? AND e.ordering < ? " "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, @@ -66,10 +67,17 @@ class StreamStore(SQLBaseStore): rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE + user_id, user_id, Membership.INVITE, from_key, to_key ) - defer.returnValue([self._parse_event_from_row(r) for r in rows]) + ret = [self._parse_event_from_row(r) for r in rows] + + if ret: + max_id = max([r["ordering"] for r in rows]) + else: + max_id = to_key + + defer.returnValue((ret, max_id)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): -- cgit 1.4.1 From 8d1f7632095b949d7726dd72fce10224764f3c11 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 15:53:06 +0100 Subject: Fix pagination to work with new db schema --- synapse/handlers/room.py | 48 +++++++++++++++++++++++------------------------ synapse/storage/stream.py | 31 +++++++++++++++++++++++------- 2 files changed, 48 insertions(+), 31 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9261984b7e..b0b2441b9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -97,30 +97,30 @@ class MessageHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) yield self.hs.get_federation().handle_new_event(event) -# -# @defer.inlineCallbacks -# def get_messages(self, user_id=None, room_id=None, pagin_config=None, -# feedback=False): -# """Get messages in a room. -# -# Args: -# user_id (str): The user requesting messages. -# room_id (str): The room they want messages from. -# pagin_config (synapse.api.streams.PaginationConfig): The pagination -# config rules to apply, if any. -# feedback (bool): True to get compressed feedback with the messages -# Returns: -# dict: Pagination API results -# """ -# yield self.auth.check_joined_room(room_id, user_id) -# -# data_source = [MessagesStreamData(self.hs, room_id=room_id, -# feedback=feedback)] -# event_stream = EventStream(user_id, data_source) -# pagin_config = yield event_stream.fix_tokens(pagin_config) -# data_chunk = yield event_stream.get_chunk(config=pagin_config) -# defer.returnValue(data_chunk) -# + + @defer.inlineCallbacks + def get_messages(self, user_id=None, room_id=None, pagin_config=None, + feedback=False): + """Get messages in a room. + + Args: + user_id (str): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + feedback (bool): True to get compressed feedback with the messages + Returns: + dict: Pagination API results + """ + yield self.auth.check_joined_room(room_id, user_id) + + data_source = [EventsStreamData(self.hs, room_id=room_id, + feedback=feedback)] + event_stream = EventStream(user_id, data_source) + pagin_config = yield event_stream.fix_tokens(pagin_config) + data_chunk = yield event_stream.get_chunk(config=pagin_config) + defer.returnValue(data_chunk) + @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): """ Stores data for a room. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1300aee8b0..6bfa00d59a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -53,18 +53,35 @@ class StreamStore(SQLBaseStore): else: limit = 1000 + # From and to keys should be integers from ordering. + from_key = int(from_key) + to_key = int(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + sql = ( "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " - " AND e.ordering > ? AND e.ordering < ? " - "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, "invites": invites_sql, - "limit": limit, } + if from_key < to_key: + sql += ( + "AND e.ordering > ? AND e.ordering < ? " + "ORDER BY ordering ASC LIMIT %(limit)d " + ) % {"limit": limit} + else: + sql += ( + "AND e.ordering < ? AND e.ordering > ? " + "ORDER BY ordering DESC LIMIT %(limit)d " + ) % {"limit": int(limit)} + rows = yield self._execute_and_decode( sql, user_id, user_id, Membership.INVITE, from_key, to_key @@ -72,12 +89,12 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if ret: - max_id = max([r["ordering"] for r in rows]) + if from_key < to_key: + key = max([r["ordering"] for r in rows]) else: - max_id = to_key + key = min([r["ordering"] for r in rows]) - defer.returnValue((ret, max_id)) + defer.returnValue((ret, key)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): -- cgit 1.4.1 From 86be66c34e3d9d3f6b8e6a40ed239f4803550e55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:04:54 +0100 Subject: Actually use MAX_STREAM_SIZE constant. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6bfa00d59a..47f05a41bd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -51,7 +51,7 @@ class StreamStore(SQLBaseStore): if limit: limit = max(limit, MAX_STREAM_SIZE) else: - limit = 1000 + limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. from_key = int(from_key) -- cgit 1.4.1 From cd2967d271d8127b6f6ebf4aa7a671d3eeca3c59 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:05:46 +0100 Subject: Fix bug when generating a key when get_room_events_stream returned zero rows --- synapse/storage/stream.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 47f05a41bd..bb56f0763d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -89,10 +89,14 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if from_key < to_key: - key = max([r["ordering"] for r in rows]) + + if rows: + if from_key < to_key: + key = max([r["ordering"] for r in rows]) + else: + key = min([r["ordering"] for r in rows]) else: - key = min([r["ordering"] for r in rows]) + key = to_key defer.returnValue((ret, key)) -- cgit 1.4.1 From 8fa3cc37f93cc90d8bf34dadbdf207eb8e2fdebd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:11:25 +0100 Subject: Comment. --- synapse/storage/stream.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bb56f0763d..7c2c45e0ff 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -71,6 +71,7 @@ class StreamStore(SQLBaseStore): "invites": invites_sql, } + # Constraints and ordering depend on direction. if from_key < to_key: sql += ( "AND e.ordering > ? AND e.ordering < ? " -- cgit 1.4.1 From d260a42ca279fbca46f85b2c96bddc4f814ecef3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:17:36 +0100 Subject: PEP8 cleanups --- synapse/api/events/room.py | 1 + synapse/handlers/room.py | 17 ++++++++++------- synapse/storage/__init__.py | 6 +++++- synapse/storage/_base.py | 3 ++- synapse/storage/room.py | 15 +++++++++++---- synapse/storage/roommember.py | 1 - synapse/storage/stream.py | 2 -- 7 files changed, 29 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index d9b3d572fa..dbf537fb88 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -20,6 +20,7 @@ class GenericEvent(SynapseEvent): def get_content_template(self): return {} + class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b0b2441b9f..4ecfb278be 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -114,8 +114,9 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [EventsStreamData(self.hs, room_id=room_id, - feedback=feedback)] + data_source = [ + EventsStreamData(self.hs, room_id=room_id, feedback=feedback) + ] event_stream = EventStream(user_id, data_source) pagin_config = yield event_stream.fix_tokens(pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config) @@ -196,7 +197,9 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state(room_id, event_type, state_key) + data = yield self.store.get_current_state( + room_id, event_type, state_key + ) defer.returnValue(data) @defer.inlineCallbacks @@ -496,7 +499,7 @@ class RoomMemberHandler(BaseHandler): SynapseError if there was a problem changing the membership. """ - #broadcast_msg = False + # broadcast_msg = False prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id @@ -570,7 +573,8 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True, + broadcast_msg=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -621,7 +625,6 @@ class RoomMemberHandler(BaseHandler): broadcast_msg=broadcast_msg, ) - if should_do_dance: yield self._do_invite_join_dance( room_id=room_id, @@ -755,7 +758,7 @@ class RoomMemberHandler(BaseHandler): room_id, "", is_public=False ) - #yield self.state_handler.handle_new_event(event) + # yield self.state_handler.handle_new_event(event) yield federation.handle_new_event(new_event) yield federation.get_state_for_room( target_host, room_id diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 750e86040e..ad36d14a36 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -87,7 +87,11 @@ class DataStore(RoomMemberStore, RoomStore, "content": json.dumps(event.content), } - unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + unrec = { + k: v + for k, v in event.get_full_dict().items() + if k not in vals.keys() + } vals["unrecognized_keys"] = json.dumps(unrec) yield self._simple_insert("events", vals) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 413838f798..36cc57c1b8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,7 +60,8 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None + "[SQL] %s Args=%s Func=%s", + query, args, decoder.__name__ if decoder else None ) def interaction(txn): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8f44b67d8c..22f2dcca45 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -87,19 +87,26 @@ class RoomStore(SQLBaseStore): """ topic_subquery = ( - "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "SELECT topics.event_id as event_id, " + "topics.room_id as room_id, topic " + "FROM topics " "INNER JOIN current_state_events as c " "ON c.event_id = topics.event_id " ) name_subquery = ( - "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "SELECT room_names.event_id as event_id, " + "room_names.room_id as room_id, name " + "FROM room_names " "INNER JOIN current_state_events as c " "ON c.event_id = room_names.event_id " ) + # We use non printing ascii character US () as a seperator sql = ( - "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "SELECT r.room_id, n.name, t.topic, " + "group_concat(a.room_alias, '') " + "FROM rooms AS r " "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " @@ -117,7 +124,7 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split(","), + "aliases": r[3].split(""), } for r in rows ] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index a0620677b9..89c87290cf 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -62,7 +62,6 @@ class RoomMemberStore(SQLBaseStore): yield self._execute(None, sql, event.room_id, domain) - @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7c2c45e0ff..cf4b1682b6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -61,7 +61,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], to_key)) return - sql = ( "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " @@ -90,7 +89,6 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if rows: if from_key < to_key: key = max([r["ordering"] for r in rows]) -- cgit 1.4.1 From 506711749f674c2b408f4034662240883e0e2764 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:45:16 +0100 Subject: We no longer need to special case room config events. --- synapse/storage/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ad36d14a36..841ad8f132 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -50,8 +50,8 @@ class DataStore(RoomMemberStore, RoomStore, yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: yield self._store_feedback(event) - elif event.type == RoomConfigEvent.TYPE: - yield self._store_room_config(event) +# elif event.type == RoomConfigEvent.TYPE: +# yield self._store_room_config(event) elif event.type == RoomNameEvent.TYPE: yield self._store_room_name(event) elif event.type == RoomTopicEvent.TYPE: -- cgit 1.4.1 From 0e938b1ff75bdeffefb3f3a3260972a19d99d4d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Aug 2014 16:47:48 +0100 Subject: Rename method name to not clash with other ones in storage. --- synapse/state.py | 2 +- synapse/storage/pdu.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/state.py b/synapse/state.py index 4f8b4d9760..ca8e1ca630 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -86,7 +86,7 @@ class StateHandler(object): else: event.depth = 0 - current_state = yield self.store.get_current_state( + current_state = yield self.store.get_current_state_pdu( key.context, key.type, key.state_key ) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 13adc581e1..a24ce7ab78 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore): txn.execute(query, query_args) - def get_current_state(self, context, pdu_type, state_key): + def get_current_state_pdu(self, context, pdu_type, state_key): """For a given context, pdu_type, state_key 3-tuple, return what is currently considered the current state. @@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore): """ return self._db_pool.runInteraction( - self._get_current_state, context, pdu_type, state_key + self._get_current_state_pdu, context, pdu_type, state_key ) - def _get_current_state(self, txn, context, pdu_type, state_key): + def _get_current_state_pdu(self, txn, context, pdu_type, state_key): return self._get_current_interaction(txn, context, pdu_type, state_key) def _get_current_interaction(self, txn, context, pdu_type, state_key): -- cgit 1.4.1 From fc26275bb34206f48d70c7effcbc6f5d0bf86ecb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 15:50:41 +0100 Subject: Add two different columns for ordering the events table, one which can be used for pagination and one which can be as tokens for notifying clients. Also add a 'processed' field which is currently always set to True --- synapse/federation/handler.py | 4 ++-- synapse/storage/__init__.py | 18 ++++++++++++++---- synapse/storage/schema/im.sql | 13 ++++++++----- synapse/storage/stream.py | 17 +++++++++-------- 4 files changed, 33 insertions(+), 19 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 580e591aca..68243d31d0 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -63,7 +63,7 @@ class FederationEventHandler(object): Deferred: Resolved when it has successfully been queued for processing. """ - yield self._fill_out_prev_events(event) + yield self.fill_out_prev_events(event) pdu = self.pdu_codec.pdu_from_event(event) @@ -129,7 +129,7 @@ class FederationEventHandler(object): yield self.event_handler.on_receive(new_state_event) @defer.inlineCallbacks - def _fill_out_prev_events(self, event): + def fill_out_prev_events(self, event): if hasattr(event, "prev_events"): return diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 841ad8f132..9f78f3f9bd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -43,9 +43,10 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() + self.hs = hs @defer.inlineCallbacks - def persist_event(self, event): + def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: @@ -57,7 +58,7 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomTopicEvent.TYPE: yield self._store_room_topic(event) - ret = yield self._store_event(event) + ret = yield self._store_event(event, backfilled) defer.returnValue(ret) @defer.inlineCallbacks @@ -79,14 +80,23 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks - def _store_event(self, event): + def _store_event(self, event, backfilled): + # FIXME (erikj): This should be removed when we start amalgamating + # event and pdu storage. + yield self.hs.get_federation().fill_out_prev_events(event) + vals = { + "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, "content": json.dumps(event.content), + "processed": True, } + if backfilled: + vals["token_ordering"] = "-1" + unrec = { k: v for k, v in event.get_full_dict().items() @@ -96,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore, yield self._simple_insert("events", vals) - if hasattr(event, "state_key"): + if not backfilled and hasattr(event, "state_key"): vals = { "event_id": event.event_id, "room_id": event.room_id, diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 2452890ea4..b0240e39af 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,12 +14,15 @@ */ CREATE TABLE IF NOT EXISTS events( - ordering INTEGER PRIMARY KEY AUTOINCREMENT, + token_ordering INTEGER AUTOINCREMENT, + topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, - room_id TEXT, - content TEXT, - unrecognized_keys TEXT + room_id TEXT NOT NULL, + content TEXT NOT NULL, + unrecognized_keys TEXT, + processed BOOL NOT NULL, + CONSTRAINT ev_uniq UNIQUE (event_id) ); CREATE TABLE IF NOT EXISTS state_events( @@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS current_state_events( room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, - CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE + CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE ); CREATE TABLE IF NOT EXISTS room_memberships( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index cf4b1682b6..f7968f576f 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -73,13 +73,14 @@ class StreamStore(SQLBaseStore): # Constraints and ordering depend on direction. if from_key < to_key: sql += ( - "AND e.ordering > ? AND e.ordering < ? " - "ORDER BY ordering ASC LIMIT %(limit)d " + "AND e.token_ordering > ? AND e.token_ordering < ? " + "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d " ) % {"limit": limit} else: sql += ( - "AND e.ordering < ? AND e.ordering > ? " - "ORDER BY ordering DESC LIMIT %(limit)d " + "AND e.token_ordering < ? " + "AND e.token_ordering > ? " + "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d " ) % {"limit": int(limit)} rows = yield self._execute_and_decode( @@ -91,9 +92,9 @@ class StreamStore(SQLBaseStore): if rows: if from_key < to_key: - key = max([r["ordering"] for r in rows]) + key = max([r["token_ordering"] for r in rows]) else: - key = min([r["ordering"] for r in rows]) + key = min([r["token_ordering"] for r in rows]) else: key = to_key @@ -105,7 +106,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events WHERE room_id = ? " - "ORDER BY ordering DESC LIMIT ? " + "ORDER BY token_ordering, rowid DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -120,7 +121,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_max_id(self): res = yield self._execute_and_decode( - "SELECT MAX(ordering) as m FROM events" + "SELECT MAX(token_ordering) as m FROM events" ) if not res: -- cgit 1.4.1 From 709a92cee89709811c51cac7d8c66922093be673 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 16:00:46 +0100 Subject: SQL doesn't allow AUTOINCREMENT on non PRIMARY KEY columns. --- synapse/storage/__init__.py | 21 +++++++++++++++++++-- synapse/storage/schema/im.sql | 2 +- tests/rest/test_presence.py | 2 ++ tests/rest/test_profile.py | 1 + 4 files changed, 23 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 9f78f3f9bd..b846081d49 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,6 +45,9 @@ class DataStore(RoomMemberStore, RoomStore, self.event_factory = hs.get_event_factory() self.hs = hs + self.min_token_deferred = self._get_min_token() + self.min_token = None + @defer.inlineCallbacks def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: @@ -82,7 +85,7 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks def _store_event(self, event, backfilled): # FIXME (erikj): This should be removed when we start amalgamating - # event and pdu storage. + # event and pdu storage yield self.hs.get_federation().fill_out_prev_events(event) vals = { @@ -95,7 +98,10 @@ class DataStore(RoomMemberStore, RoomStore, } if backfilled: - vals["token_ordering"] = "-1" + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + vals["token_ordering"] = self.min_token unrec = { k: v @@ -151,6 +157,17 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue([self._parse_event_from_row(r) for r in results]) + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + None, + "SELECT MIN(token_ordering) FROM events" + ) + + self.min_token = rows[0][0] if rows and rows[0] else 0 + + defer.returnValue(self.min_token) + def schema_path(schema): """ Get a filesystem path for the named database schema diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index b0240e39af..0fb3dbee55 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS events( - token_ordering INTEGER AUTOINCREMENT, + token_ordering INTEGER PRIMARY KEY AUTOINCREMENT, topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, diff --git a/tests/rest/test_presence.py b/tests/rest/test_presence.py index 91d4d1ff6c..99287823e4 100644 --- a/tests/rest/test_presence.py +++ b/tests/rest/test_presence.py @@ -51,6 +51,7 @@ class PresenceStateTestCase(unittest.TestCase): hs = HomeServer("test", db_pool=None, http_client=None, + datastore=None, resource_for_client=self.mock_server, resource_for_federation=self.mock_server, ) @@ -109,6 +110,7 @@ class PresenceListTestCase(unittest.TestCase): hs = HomeServer("test", db_pool=None, http_client=None, + datastore=None, resource_for_client=self.mock_server, resource_for_federation=self.mock_server ) diff --git a/tests/rest/test_profile.py b/tests/rest/test_profile.py index ff1e92805e..ee47daf4c2 100644 --- a/tests/rest/test_profile.py +++ b/tests/rest/test_profile.py @@ -46,6 +46,7 @@ class ProfileTestCase(unittest.TestCase): resource_for_client=self.mock_server, federation=Mock(), replication_layer=Mock(), + datastore=None, ) def _get_user_by_token(token=None): -- cgit 1.4.1 From 4eb8f84aa8fc735e228f66d11c355625bb14c1cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 16:20:21 +0100 Subject: Make snapshot_all_rooms return results in the correct form, including start and end tokens. --- synapse/handlers/room.py | 9 +++++++-- synapse/storage/stream.py | 25 +++++++++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cdc98d2b08..9e10235fa0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -269,11 +269,16 @@ class MessageHandler(BaseHandler): if event.membership != Membership.JOIN: continue try: - messages = yield self.store.get_recent_events_for_room( + messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=50, ) - d["messages"] = [m.get_dict() for m in messages] + + d["messages"] = { + "chunk": [m.get_dict() for m in messages], + "start": token[0], + "end": token[1], + } except: pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f7968f576f..6728a4b5ea 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -104,19 +104,36 @@ class StreamStore(SQLBaseStore): def get_recent_events_for_room(self, room_id, limit, with_feedback=False): # TODO (erikj): Handle compressed feedback + end_token = yield self.get_room_events_max_id() + sql = ( - "SELECT * FROM events WHERE room_id = ? " - "ORDER BY token_ordering, rowid DESC LIMIT ? " + "SELECT * FROM events WHERE " + "WHERE room_id = ? AND token_ordering <= ? " + "ORDER BY topological_ordering, rowid DESC LIMIT ? " ) rows = yield self._execute_and_decode( sql, - room_id, limit + room_id, end_token, limit ) rows.reverse() # As we selected with reverse ordering - defer.returnValue([self._parse_event_from_row(r) for r in rows]) + if rows: + topo = rows[0]["topological_ordering"] + row_id = rows[0]["rowid"] + start_token = "p%s-%s" % (topo, row_id) + + token = (start_token, end_token) + else: + token = ("START", end_token) + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + token + ) + ) @defer.inlineCallbacks def get_room_events_max_id(self): -- cgit 1.4.1 From 1422a22970e4ab3f7a37fe672af2f1d1de901c10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Aug 2014 16:25:18 +0100 Subject: Fix typos in SQL and where we still had rowid's (which no longer exist) --- synapse/handlers/room.py | 2 +- synapse/storage/stream.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9e10235fa0..e5dd2e245f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -280,7 +280,7 @@ class MessageHandler(BaseHandler): "end": token[1], } except: - pass + logger.exception("Failed to get snapshot") logger.debug("snapshot_all_rooms returning: %s", ret) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6728a4b5ea..c03c983e14 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -74,13 +74,13 @@ class StreamStore(SQLBaseStore): if from_key < to_key: sql += ( "AND e.token_ordering > ? AND e.token_ordering < ? " - "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d " + "ORDER BY token_ordering ASC LIMIT %(limit)d " ) % {"limit": limit} else: sql += ( "AND e.token_ordering < ? " "AND e.token_ordering > ? " - "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d " + "ORDER BY e.token_ordering DESC LIMIT %(limit)d " ) % {"limit": int(limit)} rows = yield self._execute_and_decode( @@ -107,9 +107,9 @@ class StreamStore(SQLBaseStore): end_token = yield self.get_room_events_max_id() sql = ( - "SELECT * FROM events WHERE " + "SELECT * FROM events " "WHERE room_id = ? AND token_ordering <= ? " - "ORDER BY topological_ordering, rowid DESC LIMIT ? " + "ORDER BY topological_ordering, token_ordering DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -121,8 +121,8 @@ class StreamStore(SQLBaseStore): if rows: topo = rows[0]["topological_ordering"] - row_id = rows[0]["rowid"] - start_token = "p%s-%s" % (topo, row_id) + toke = rows[0]["token_ordering"] + start_token = "p%s-%s" % (topo, toke) token = (start_token, end_token) else: -- cgit 1.4.1 From 598a1d8ff953c70f9f54564225d693a1bcf42144 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:19:48 +0100 Subject: Change the way pagination works to support out of order events. --- synapse/api/streams/__init__.py | 19 ++- synapse/api/streams/event.py | 92 +++++----- synapse/handlers/presence.py | 2 +- synapse/handlers/room.py | 3 +- synapse/storage/schema/im.sql | 2 +- synapse/storage/stream.py | 186 +++++++++++++++++---- .../components/matrix/event-stream-service.js | 1 - webclient/components/matrix/matrix-service.js | 4 +- 8 files changed, 227 insertions(+), 82 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py index 989e63f9ec..44f4cc6078 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/api/streams/__init__.py @@ -20,23 +20,23 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" - def __init__(self, from_tok=None, to_tok=None, limit=0): + def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): self.from_tok = from_tok self.to_tok = to_tok + self.direction = direction self.limit = limit @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": PaginationStream.TOK_START, - "to_tok": PaginationStream.TOK_END, - "limit": 0 + "direction": 'f', } query_param_mappings = [ # 3-tuple of qp_key, attribute, rules ("from", "from_tok", lambda x: type(x) == str), ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()) + ("limit", "limit", lambda x: x.isdigit()), + ("dir", "direction", lambda x: x == 'f' or x == 'b'), ] for qp, attr, is_valid in query_param_mappings: @@ -48,12 +48,17 @@ class PaginationConfig(object): return PaginationConfig(**params) + def __str__(self): + return ( + "" + ) % (self.from_tok, self.to_tok, self.direction, self.limit) + class PaginationStream(object): """ An interface for streaming data as chunks. """ - TOK_START = "START" TOK_END = "END" def get_chunk(self, config=None): @@ -76,7 +81,7 @@ class StreamData(object): self.hs = hs self.store = hs.get_datastore() - def get_rows(self, user_id, from_pkey, to_pkey, limit): + def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): """ Get event stream data between the specified pkeys. Args: diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 414b05be30..a5c8b2b31f 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -38,8 +38,8 @@ class EventsStreamData(StreamData): self.with_feedback = feedback @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - data, latest_ver = yield self.store.get_room_events_stream( + def get_rows(self, user_id, from_key, to_key, limit, direction): + data, latest_ver = yield self.store.get_room_events( user_id=user_id, from_key=from_key, to_key=to_key, @@ -70,6 +70,15 @@ class EventStream(PaginationStream): pagination_config.from_tok) pagination_config.to_tok = yield self.fix_token( pagination_config.to_tok) + + if ( + not pagination_config.to_tok + and pagination_config.direction == 'f' + ): + pagination_config.to_tok = yield self.get_current_max_token() + + logger.debug("pagination_config: %s", pagination_config) + defer.returnValue(pagination_config) @defer.inlineCallbacks @@ -81,39 +90,42 @@ class EventStream(PaginationStream): Returns: The fixed-up token, which may == token. """ - # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. - replacements = [ - (PaginationStream.TOK_START, "0"), - (PaginationStream.TOK_END, "-1") - ] - for magic_token, key in replacements: - if magic_token == token: - token = EventStream.SEPARATOR.join( - [key] * len(self.stream_data) - ) - - # replace -1 values with an actual pkey - token_segments = self._split_token(token) - for i, tok in enumerate(token_segments): - if tok == -1: - # add 1 to the max token because results are EXCLUSIVE from the - # latest version. - token_segments[i] = 1 + (yield self.stream_data[i].max_token()) - defer.returnValue(EventStream.SEPARATOR.join( - str(x) for x in token_segments - )) + if token == PaginationStream.TOK_END: + new_token = yield self.get_current_max_token() + + logger.debug("fix_token: From %s to %s", token, new_token) + + token = new_token + + defer.returnValue(token) @defer.inlineCallbacks - def get_chunk(self, config=None): + def get_current_max_token(self): + new_token_parts = [] + for s in self.stream_data: + mx = yield s.max_token() + new_token_parts.append(str(mx)) + + new_token = EventStream.SEPARATOR.join(new_token_parts) + + logger.debug("get_current_max_token: %s", new_token) + + defer.returnValue(new_token) + + @defer.inlineCallbacks + def get_chunk(self, config): # no support for limit on >1 streams, makes no sense. if config.limit and len(self.stream_data) > 1: raise EventStreamError( 400, "Limit not supported on multiplexed streams." ) - (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, - config.to_tok, - config.limit) + chunk_data, next_tok = yield self._get_chunk_data( + config.from_tok, + config.to_tok, + config.limit, + config.direction, + ) defer.returnValue({ "chunk": chunk_data, @@ -122,7 +134,7 @@ class EventStream(PaginationStream): }) @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit): + def _get_chunk_data(self, from_tok, to_tok, limit, direction): """ Get event data between the two tokens. Tokens are SEPARATOR separated values representing pkey values of @@ -140,11 +152,12 @@ class EventStream(PaginationStream): EventStreamError if something went wrong. """ # sanity check - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") + if to_tok is not None: + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") chunk = [] next_ver = [] @@ -158,7 +171,7 @@ class EventStream(PaginationStream): continue (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit + self.user_id, from_pkey, to_pkey, limit, direction, ) chunk.extend([ @@ -177,9 +190,8 @@ class EventStream(PaginationStream): Returns: A list of ints. """ - segments = token.split(EventStream.SEPARATOR) - try: - int_segments = [int(x) for x in segments] - except ValueError: - raise EventStreamError(400, "Bad token: %s" % token) - return int_segments + if token: + segments = token.split(EventStream.SEPARATOR) + else: + segments = [None] * len(self.stream_data) + return segments diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e8cb83eddb..f140dc527a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -684,7 +684,7 @@ class PresenceStreamData(StreamData): super(PresenceStreamData, self).__init__(hs) self.presence = hs.get_handlers().presence_handler - def get_rows(self, user_id, from_key, to_key, limit): + def get_rows(self, user_id, from_key, to_key, limit, direction): cachemap = self.presence._user_cachemap # TODO(paul): limit, and filter by visibility diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e5dd2e245f..40867ae2e0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -466,7 +466,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", + "start": "START", # FIXME (erikj): START is no longer a valid value "end": "END", "chunk": event_list } @@ -811,4 +811,5 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): chunk = yield self.store.get_rooms(is_public=True) + # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 0fb3dbee55..ea04261ff0 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS events( - token_ordering INTEGER PRIMARY KEY AUTOINCREMENT, + stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, topological_ordering INTEGER NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c03c983e14..87fc978813 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -16,8 +16,9 @@ from twisted.internet import defer from ._base import SQLBaseStore - +from synapse.api.errors import SynapseError from synapse.api.constants import Membership +from synapse.util.logutils import log_function import json import logging @@ -29,9 +30,96 @@ logger = logging.getLogger(__name__) MAX_STREAM_SIZE = 1000 +_STREAM_TOKEN = "stream" +_TOPOLOGICAL_TOKEN = "topological" + + +def _parse_stream_token(string): + try: + if string[0] != 's': + raise + return int(string[1:]) + except: + logger.debug("Not stream token: %s", string) + raise SynapseError(400, "Invalid token") + + +def _parse_topological_token(string): + try: + if string[0] != 't': + raise + parts = string[1:].split('-', 1) + return (int(parts[0]), int(parts[1])) + except: + logger.debug("Not topological token: %s", string) + raise SynapseError(400, "Invalid token") + + +def is_stream_token(string): + try: + _parse_stream_token(string) + return True + except: + return False + + +def is_topological_token(string): + try: + _parse_topological_token(string) + return True + except: + return False + + +def _get_token_bound(token, comparison): + try: + s = _parse_stream_token(token) + return "%s %s %d" % ("stream_ordering", comparison, s) + except: + pass + + try: + top, stream = _parse_topological_token(token) + return "%s %s %d AND %s %s %d" % ( + "topological_ordering", comparison, top, + "stream_ordering", comparison, stream, + ) + except: + pass + + raise SynapseError(400, "Invalid token") + + class StreamStore(SQLBaseStore): + @log_function + def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, + direction='f', with_feedback=False): + is_events = ( + direction == 'f' + and is_stream_token(from_key) + and to_key and is_stream_token(to_key) + ) + + if is_events: + return self.get_room_events_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, + ) + else: + return self.paginate_room_events( + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, + ) @defer.inlineCallbacks + @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): # TODO (erikj): Handle compressed feedback @@ -54,8 +142,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_key = int(from_key) - to_key = int(to_key) + from_id = _parse_stream_token(from_key) + to_id = _parse_stream_token(to_key) if from_key == to_key: defer.returnValue(([], to_key)) @@ -65,41 +153,78 @@ class StreamStore(SQLBaseStore): "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " + "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "current": current_room_membership_sql, "invites": invites_sql, + "limit": limit } - # Constraints and ordering depend on direction. - if from_key < to_key: - sql += ( - "AND e.token_ordering > ? AND e.token_ordering < ? " - "ORDER BY token_ordering ASC LIMIT %(limit)d " - ) % {"limit": limit} - else: - sql += ( - "AND e.token_ordering < ? " - "AND e.token_ordering > ? " - "ORDER BY e.token_ordering DESC LIMIT %(limit)d " - ) % {"limit": int(limit)} - rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE, from_key, to_key + user_id, user_id, Membership.INVITE, from_id, to_id ) ret = [self._parse_event_from_row(r) for r in rows] if rows: - if from_key < to_key: - key = max([r["token_ordering"] for r in rows]) - else: - key = min([r["token_ordering"] for r in rows]) + key = "s%d" % max([r["stream_ordering"] for r in rows]) else: + # Assume we didn't get anything because there was nothing to get. key = to_key defer.returnValue((ret, key)) + @defer.inlineCallbacks + @log_function + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, + with_feedback=False): + # TODO (erikj): Handle compressed feedback + + from_comp = '<' if direction =='b' else '>' + to_comp = '>' if direction =='b' else '<' + order = "DESC" if direction == 'b' else "ASC" + + args = [room_id] + + bounds = _get_token_bound(from_key, from_comp) + if to_key: + bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) + + if int(limit) > 0: + args.append(int(limit)) + limit_str = " LIMIT ?" + else: + limit_str = "" + + sql = ( + "SELECT * FROM events " + "WHERE room_id = ? AND %(bounds)s " + "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " + ) % {"bounds": bounds, "order": order, "limit": limit_str} + + rows = yield self._execute_and_decode( + sql, + *args + ) + + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + next_token = "t%s-%s" % (topo, toke) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + next_token + ) + ) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False): # TODO (erikj): Handle compressed feedback @@ -108,8 +233,8 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events " - "WHERE room_id = ? AND token_ordering <= ? " - "ORDER BY topological_ordering, token_ordering DESC LIMIT ? " + "WHERE room_id = ? AND stream_ordering <= ? " + "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? " ) rows = yield self._execute_and_decode( @@ -121,12 +246,12 @@ class StreamStore(SQLBaseStore): if rows: topo = rows[0]["topological_ordering"] - toke = rows[0]["token_ordering"] + toke = rows[0]["stream_ordering"] start_token = "p%s-%s" % (topo, toke) token = (start_token, end_token) else: - token = ("START", end_token) + token = (end_token, end_token) defer.returnValue( ( @@ -138,11 +263,14 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_max_id(self): res = yield self._execute_and_decode( - "SELECT MAX(token_ordering) as m FROM events" + "SELECT MAX(stream_ordering) as m FROM events" ) - if not res: - defer.returnValue(0) + logger.debug("get_room_events_max_id: %s", res) + + if not res or not res[0] or not res[0]["m"]: + defer.returnValue("s1") return - defer.returnValue(res[0]["m"]) + key = res[0]["m"] + 1 + defer.returnValue("s%d" % (key,)) diff --git a/webclient/components/matrix/event-stream-service.js b/webclient/components/matrix/event-stream-service.js index c94cf0fe72..a446fad5d4 100644 --- a/webclient/components/matrix/event-stream-service.js +++ b/webclient/components/matrix/event-stream-service.js @@ -25,7 +25,6 @@ the eventHandlerService. angular.module('eventStreamService', []) .factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) { var END = "END"; - var START = "START"; var TIMEOUT_MS = 30000; var ERR_TIMEOUT_MS = 5000; diff --git a/webclient/components/matrix/matrix-service.js b/webclient/components/matrix/matrix-service.js index c52c94c310..3cd0aa674b 100644 --- a/webclient/components/matrix/matrix-service.js +++ b/webclient/components/matrix/matrix-service.js @@ -230,8 +230,8 @@ angular.module('matrixService', []) path = path.replace("$room_id", room_id); var params = { from: from_token, - to: "START", - limit: limit + limit: limit, + dir: 'b' }; return doRequest("GET", path, params); }, -- cgit 1.4.1 From 75b6d982a01a431a89d2ab76d91a09159630d059 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:20:03 +0100 Subject: Add a 'backfill room' button --- synapse/federation/handler.py | 22 +++++++++++++++------- synapse/federation/replication.py | 9 ++++++--- synapse/handlers/federation.py | 21 ++++++++++++++++++--- synapse/rest/room.py | 16 ++++++++++++++++ synapse/storage/__init__.py | 22 ++++++++++++++++++---- synapse/storage/pdu.py | 16 ++++++++-------- 6 files changed, 81 insertions(+), 25 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 68243d31d0..984c1558e9 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -74,10 +74,18 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def backfill(self, room_id, limit): - # TODO: Work out which destinations to ask for backfill - # self.replication_layer.backfill(dest, room_id, limit) - pass + def backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) @log_function def get_state_for_room(self, destination, room_id): @@ -87,7 +95,7 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu): + def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it throught the StateHandler. """ @@ -95,7 +103,7 @@ class FederationEventHandler(object): try: with (yield self.lock_manager.lock(pdu.context)): - if event.is_state: + if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( pdu ) @@ -104,7 +112,7 @@ class FederationEventHandler(object): else: is_new_state = False - yield self.event_handler.on_receive(event, is_new_state) + yield self.event_handler.on_receive(event, is_new_state, backfilled) except AuthError: # TODO: Implement something in federation that allows us to diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bc9df2f214..3e5f1a4108 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -208,7 +208,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(pdu, backfilled=True) defer.returnValue(pdus) @@ -415,7 +415,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu): + def _handle_new_pdu(self, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -451,7 +451,10 @@ class ReplicationLayer(object): # Persist the Pdu, but don't mark it as processed yet. yield self.pdu_actions.persist_received(pdu) - ret = yield self.handler.on_receive_pdu(pdu) + if not backfilled: + ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + else: + ret = None yield self.pdu_actions.mark_as_processed(pdu) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a2..ef9ed274df 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,7 +35,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +70,21 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) - yield self.notifier.on_new_room_event(event, store_id) + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.debug("Failed to persiste event: %s", event) + + defer.returnValue(events) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index dfb2aabe70..89ea9f0d25 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet): defer.returnValue((200, msgs)) +class RoomTriggerBackfill(RestServlet): + PATTERN = client_path_pattern("/rooms/(?P[^/]*)/backfill$") + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + remote_server = urllib.unquote(request.args["remote"][0]) + room_id = urllib.unquote(room_id) + limit = int(request.args["limit"][0]) + + handler = self.handlers.federation_handler + events = yield handler.backfill(remote_server, room_id, limit) + + res = [event.get_dict() for event in events] + defer.returnValue((200, res)) + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -403,3 +418,4 @@ def register_servlets(hs, http_server): RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) + RoomTriggerBackfill(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b846081d49..2243a710db 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,6 +20,8 @@ from synapse.api.events.room import ( RoomConfigEvent, RoomNameEvent, ) +from synapse.util.logutils import log_function + from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore from .transactions import TransactionStore import json +import logging import os +logger = logging.getLogger(__name__) + + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, @@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token = None @defer.inlineCallbacks + @log_function def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) @@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks + @log_function def _store_event(self, event, backfilled): # FIXME (erikj): This should be removed when we start amalgamating # event and pdu storage @@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore, if not self.min_token_deferred.called: yield self.min_token_deferred self.min_token -= 1 - vals["token_ordering"] = self.min_token + vals["stream_ordering"] = self.min_token unrec = { k: v @@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore, } vals["unrecognized_keys"] = json.dumps(unrec) - yield self._simple_insert("events", vals) + try: + yield self._simple_insert("events", vals) + except: + logger.exception("Failed to persist, probably duplicate") + return if not backfilled and hasattr(event, "state_key"): vals = { @@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore, def _get_min_token(self): row = yield self._execute( None, - "SELECT MIN(token_ordering) FROM events" + "SELECT MIN(stream_ordering) FROM events" ) - self.min_token = rows[0][0] if rows and rows[0] else 0 + self.min_token = min(row[0][0], -1) if row and row[0] else -1 + + logger.debug("min_token is: %s", self.min_token) defer.returnValue(self.min_token) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a24ce7ab78..7655f43ede 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table, JoinHelper from synapse.util.logutils import log_function @@ -319,6 +321,7 @@ class PduStore(SQLBaseStore): return [(row[0], row[1], row[2]) for row in results] + @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): """Get a list of Pdus that we haven't backfilled beyond yet (and haven't seen). This list is used when we want to backfill backwards and is the @@ -331,17 +334,14 @@ class PduStore(SQLBaseStore): Returns: list: A list of PduIdTuple. """ - return self._db_pool.runInteraction( - self._get_oldest_pdus_in_context, context - ) - - def _get_oldest_pdus_in_context(self, txn, context): - txn.execute( + results = yield self._execute( + None, "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" % {"back": PduBackwardExtremitiesTable.table_name, }, - (context,) + context ) - return [PduIdTuple(i, o) for i, o in txn.fetchall()] + + defer.returnValue([PduIdTuple(i, o) for i, o in results]) def is_pdu_new(self, pdu_id, origin, context, depth): """For a given Pdu, try and figure out if it's 'new', i.e., if it's -- cgit 1.4.1 From 840771190fc11e51ee0810749a99d87186ccb78b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 14:32:47 +0100 Subject: Fix bug where we sometimes set min_token to None. --- synapse/storage/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2243a710db..470b7b7663 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -176,7 +176,8 @@ class DataStore(RoomMemberStore, RoomStore, "SELECT MIN(stream_ordering) FROM events" ) - self.min_token = min(row[0][0], -1) if row and row[0] else -1 + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) logger.debug("min_token is: %s", self.min_token) -- cgit 1.4.1 From d94765999d26fd38be4eb37b9be8a3da112b5b82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 16:40:25 +0100 Subject: Add comment about what strorage.stream does --- synapse/storage/stream.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 87fc978813..18c1002e2c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,6 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" This module is responsible for getting events from the DB for pagination +and event streaming. + +The order it returns events in depend on whether we are streaming forwards or +are paginating backwards. We do this because we want to handle out of order +messages nicely, while still returning them in the correct order when we +paginate bacwards. + +This is implemented by keeping two ordering columns: stream_ordering and +topological_ordering. Stream ordering is basically insertion/received order +(except for events from backfill requests). The topolgical_ordering is a +weak ordering of events based on the pdu graph. + +This means that we have to have two different types of tokens, depending on +what sort order was used: + - stream tokens are of the form: "s%d", which maps directly to the column + - topological tokems: "t%d-%d", where the integers map to the topological + and stream ordering columns respectively. +""" + from twisted.internet import defer from ._base import SQLBaseStore -- cgit 1.4.1 From eea2dc7dde1b06af927ff34c723a9414fef5255e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 16:40:38 +0100 Subject: Remove debug logging from token parsing funcs. --- synapse/storage/stream.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 18c1002e2c..3a67baa261 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -60,7 +60,6 @@ def _parse_stream_token(string): raise return int(string[1:]) except: - logger.debug("Not stream token: %s", string) raise SynapseError(400, "Invalid token") @@ -71,7 +70,6 @@ def _parse_topological_token(string): parts = string[1:].split('-', 1) return (int(parts[0]), int(parts[1])) except: - logger.debug("Not topological token: %s", string) raise SynapseError(400, "Invalid token") -- cgit 1.4.1 From ae493c9418b9fa2ad2e9686ff29117ae271e8cfd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 16:45:55 +0100 Subject: Fix token to correct format --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3a67baa261..895a61fbc1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -265,7 +265,7 @@ class StreamStore(SQLBaseStore): if rows: topo = rows[0]["topological_ordering"] toke = rows[0]["stream_ordering"] - start_token = "p%s-%s" % (topo, toke) + start_token = "t%s-%s" % (topo, toke) token = (start_token, end_token) else: -- cgit 1.4.1 From d4fb1c8a924911ea7a24b927157f8ae21087f631 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Aug 2014 17:18:19 +0100 Subject: Only hit get_room_events_stream if we have a valid user_id --- synapse/storage/stream.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 895a61fbc1..f2be275641 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -112,8 +112,11 @@ class StreamStore(SQLBaseStore): @log_function def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, direction='f', with_feedback=False): + # We deal with events request in two different ways depending on if + # this looks like an /events request or a pagination request. is_events = ( direction == 'f' + and user_id and is_stream_token(from_key) and to_key and is_stream_token(to_key) ) -- cgit 1.4.1 From e8244c23baf150c59e737761c15ce540a3e9e26f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Aug 2014 15:53:07 +0100 Subject: Give the event_id of the failed event --- synapse/storage/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 470b7b7663..7732906927 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -121,7 +121,10 @@ class DataStore(RoomMemberStore, RoomStore, try: yield self._simple_insert("events", vals) except: - logger.exception("Failed to persist, probably duplicate") + logger.exception( + "Failed to persist, probably duplicate: %s", + event_id + ) return if not backfilled and hasattr(event, "state_key"): -- cgit 1.4.1 From ebd3c41edeebcceb990fc3d60974c5166a7d143a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Aug 2014 16:07:20 +0100 Subject: Make event stream storage return all membership events about the user, regardless of if they were in the room or not. --- synapse/storage/stream.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f2be275641..e994017bf2 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -151,10 +151,12 @@ class StreamStore(SQLBaseStore): "WHERE m.user_id = ?" ) - invites_sql = ( + # We also want to get any membership events about that user, e.g. + # invites or leave notifications. + membership_sql = ( "SELECT m.event_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " - "WHERE m.user_id = ? AND m.membership = ?" + "WHERE m.user_id = ? " ) if limit: @@ -178,13 +180,13 @@ class StreamStore(SQLBaseStore): "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "current": current_room_membership_sql, - "invites": invites_sql, + "invites": membership_sql, "limit": limit } rows = yield self._execute_and_decode( sql, - user_id, user_id, Membership.INVITE, from_id, to_id + user_id, user_id, from_id, to_id ) ret = [self._parse_event_from_row(r) for r in rows] -- cgit 1.4.1 From 063e1b22e62915ec77bfd3cb9477c29600acb568 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 15:06:00 +0100 Subject: Stop internal keys from getting into SynapseEvents --- synapse/api/events/__init__.py | 1 + synapse/storage/_base.py | 5 +++++ 2 files changed, 6 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 921fd08832..aa04dbece7 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -51,6 +51,7 @@ class SynapseEvent(JsonEncodedObject): "depth", "destinations", "origin", + "outlier", ] required_keys = [ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 36cc57c1b8..75aab2d3b9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -294,6 +294,11 @@ class SQLBaseStore(object): def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + + d.pop("stream_ordering", None) + d.pop("topological_ordering", None) + d.pop("processed", None) + d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] -- cgit 1.4.1 From e7ee0b9fc113b1fd29b8cb96eea7a00641e56887 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Aug 2014 16:40:21 +0100 Subject: Change IM sync api to also return the current presence list. --- synapse/handlers/room.py | 24 +++++++++++++++++++++--- synapse/storage/stream.py | 5 ++--- 2 files changed, 23 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d9809bd6de..899b653fb7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomConfigEvent ) from synapse.api.streams.event import EventStream, EventsStreamData +from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -257,7 +258,19 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - ret = [] + rooms_ret = [] + + now_rooms_token = yield self.store.get_room_events_max_id() + + # FIXME (erikj): Fix this. + presence_stream = PresenceStreamData(self.hs) + now_presence_token = yield presence_stream.max_token() + presence = yield presence_stream.get_rows( + user_id, 0, now_presence_token, None, None + ) + + # FIXME (erikj): We need to not generate this token, + now_token = "%s_%s" % (now_rooms_token, now_presence_token) for event in room_list: d = { @@ -268,14 +281,15 @@ class MessageHandler(BaseHandler): if event.membership == Membership.INVITE: d["inviter"] = event.user_id - ret.append(d) + rooms_ret.append(d) if event.membership != Membership.JOIN: continue try: messages, token = yield self.store.get_recent_events_for_room( event.room_id, - limit=50, + limit=10, + end_token=now_rooms_token, ) d["messages"] = { @@ -289,6 +303,10 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") + user = self.hs.parse_userid(user_id) + + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} + logger.debug("snapshot_all_rooms returning: %s", ret) defer.returnValue(ret) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e994017bf2..8bc502483a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -249,11 +249,10 @@ class StreamStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + def get_recent_events_for_room(self, room_id, limit, end_token, + with_feedback=False): # TODO (erikj): Handle compressed feedback - end_token = yield self.get_room_events_max_id() - sql = ( "SELECT * FROM events " "WHERE room_id = ? AND stream_ordering <= ? " -- cgit 1.4.1 From 808f663ed179dcb16a315810f7f0f8a7eec77e01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Aug 2014 13:06:07 +0100 Subject: Don't return state event outlier's when paginating. --- synapse/storage/__init__.py | 7 ++++++- synapse/storage/schema/im.sql | 1 + synapse/storage/stream.py | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7732906927..d06033b980 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -105,6 +105,11 @@ class DataStore(RoomMemberStore, RoomStore, "processed": True, } + if hasattr(event, "outlier"): + vals["outlier"] = event.outlier + else: + vals["outlier"] = False + if backfilled: if not self.min_token_deferred.called: yield self.min_token_deferred @@ -123,7 +128,7 @@ class DataStore(RoomMemberStore, RoomStore, except: logger.exception( "Failed to persist, probably duplicate: %s", - event_id + event.event_id ) return diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index ea04261ff0..39a1ed703e 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS events( content TEXT NOT NULL, unrecognized_keys TEXT, processed BOOL NOT NULL, + outlier BOOL NOT NULL, CONSTRAINT ev_uniq UNIQUE (event_id) ); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8bc502483a..87ae961ccd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -177,6 +177,7 @@ class StreamStore(SQLBaseStore): "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "current": current_room_membership_sql, @@ -224,7 +225,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events " - "WHERE room_id = ? AND %(bounds)s " + "WHERE outlier = 0 AND room_id = ? AND %(bounds)s " "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " ) % {"bounds": bounds, "order": order, "limit": limit_str} -- cgit 1.4.1