diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 63afca9ad7..5e52e9fecf 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -16,8 +16,9 @@
from twisted.internet import defer
from synapse.api.events.room import (
- RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
- RoomConfigEvent, RoomNameEvent,
+ RoomMemberEvent, RoomTopicEvent, FeedbackEvent,
+# RoomConfigEvent,
+ RoomNameEvent,
)
from synapse.util.logutils import log_function
@@ -116,6 +117,11 @@ class DataStore(RoomMemberStore, RoomStore,
if stream_ordering is not None:
vals["stream_ordering"] = stream_ordering
+ if hasattr(event, "outlier"):
+ vals["outlier"] = event.outlier
+ else:
+ vals["outlier"] = False
+
unrec = {
k: v
for k, v in event.get_full_dict().items()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index cfbe85d798..33d56f47ce 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -296,6 +296,11 @@ class SQLBaseStore(object):
def _parse_event_from_row(self, row_dict):
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+
+ d.pop("stream_ordering", None)
+ d.pop("topological_ordering", None)
+ d.pop("processed", None)
+
d.update(json.loads(row_dict["unrecognized_keys"]))
d["content"] = json.loads(d["content"])
del d["unrecognized_keys"]
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index d2d1e1d1e8..336192ad01 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -15,11 +15,7 @@
from twisted.internet import defer
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import FeedbackEvent
-
-import collections
-import json
+from ._base import SQLBaseStore
class FeedbackStore(SQLBaseStore):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 1ae3220197..d1f1a232f8 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -18,12 +18,10 @@ from twisted.internet import defer
from sqlite3 import IntegrityError
from synapse.api.errors import StoreError
-from synapse.api.events.room import RoomTopicEvent
from ._base import SQLBaseStore, Table
import collections
-import json
import logging
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 1df043cd36..5038aeea03 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -15,15 +15,10 @@
from twisted.internet import defer
-from synapse.types import UserID
-from synapse.api.constants import Membership
-from synapse.api.events.room import RoomMemberEvent
-
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore
+from synapse.api.constants import Membership
-import collections
-import json
import logging
logger = logging.getLogger(__name__)
@@ -34,14 +29,15 @@ class RoomMemberStore(SQLBaseStore):
def _store_room_member_txn(self, txn, event):
"""Store a room member in the database.
"""
- domain = self.hs.parse_userid(event.target_user_id).domain
+ target_user_id = event.state_key
+ domain = self.hs.parse_userid(target_user_id).domain
self._simple_insert_txn(
txn,
"room_memberships",
{
"event_id": event.event_id,
- "user_id": event.target_user_id,
+ "user_id": target_user_id,
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
@@ -145,7 +141,28 @@ class RoomMemberStore(SQLBaseStore):
rows = yield self._execute_and_decode(sql, *where_values)
- logger.debug("_get_members_query Got rows %s", rows)
+ # logger.debug("_get_members_query Got rows %s", rows)
results = [self._parse_event_from_row(r) for r in rows]
defer.returnValue(results)
+
+ @defer.inlineCallbacks
+ def do_users_share_a_room(self, user_list):
+ """ Checks whether a list of users share a room.
+ """
+ user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_list))
+ sql = (
+ "SELECT m.room_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c "
+ "ON m.event_id = c.event_id "
+ "WHERE m.membership = 'join' "
+ "AND (%(clause)s) "
+ "GROUP BY m.room_id HAVING COUNT(m.room_id) = ?"
+ ) % {"clause": user_list_clause}
+
+ args = user_list
+ args.append(len(user_list))
+
+ rows = yield self._execute(None, sql, *args)
+
+ defer.returnValue(len(rows) > 0)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index ea04261ff0..e92f21ef3b 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -22,9 +22,15 @@ CREATE TABLE IF NOT EXISTS events(
content TEXT NOT NULL,
unrecognized_keys TEXT,
processed BOOL NOT NULL,
+ outlier BOOL NOT NULL,
CONSTRAINT ev_uniq UNIQUE (event_id)
);
+CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
+CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
+CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
+CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
+
CREATE TABLE IF NOT EXISTS state_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
@@ -33,6 +39,12 @@ CREATE TABLE IF NOT EXISTS state_events(
prev_state TEXT
);
+CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
+CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
+CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
+CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
+
+
CREATE TABLE IF NOT EXISTS current_state_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
@@ -41,6 +53,11 @@ CREATE TABLE IF NOT EXISTS current_state_events(
CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
);
+CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
+CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
+CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
+
CREATE TABLE IF NOT EXISTS room_memberships(
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
@@ -49,6 +66,10 @@ CREATE TABLE IF NOT EXISTS room_memberships(
membership TEXT NOT NULL
);
+CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
+CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
+
CREATE TABLE IF NOT EXISTS feedback(
event_id TEXT NOT NULL,
feedback_type TEXT,
@@ -77,5 +98,6 @@ CREATE TABLE IF NOT EXISTS rooms(
CREATE TABLE IF NOT EXISTS room_hosts(
room_id TEXT NOT NULL,
- host TEXT NOT NULL
+ host TEXT NOT NULL,
+ CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 7460bf28d7..ac887e2957 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,10 +37,8 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.errors import SynapseError
-from synapse.api.constants import Membership
from synapse.util.logutils import log_function
-import json
import logging
@@ -177,6 +175,7 @@ class StreamStore(SQLBaseStore):
"((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering < ? "
+ "AND e.outlier = 0 "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
"current": current_room_membership_sql,
@@ -224,7 +223,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT * FROM events "
- "WHERE room_id = ? AND %(bounds)s "
+ "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
"ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
) % {"bounds": bounds, "order": order, "limit": limit_str}
@@ -249,15 +248,14 @@ class StreamStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+ def get_recent_events_for_room(self, room_id, limit, end_token,
+ with_feedback=False):
# TODO (erikj): Handle compressed feedback
- end_token = yield self.get_room_events_max_id()
-
sql = (
"SELECT * FROM events "
"WHERE room_id = ? AND stream_ordering <= ? "
- "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
+ "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
)
rows = yield self._execute_and_decode(
|