summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-26 13:55:37 +0100
committerErik Johnston <erik@matrix.org>2014-08-26 13:55:37 +0100
commit485bb64ddbcc372c5ed3d74190eda06f02cb3f82 (patch)
treee89ec33366b907bea794e9140f962d2951b29ff9 /synapse/storage
parentUse new StreamToken in pagination config (diff)
parentAdd the ability to turn on the twisted manhole telnet service. (diff)
downloadsynapse-485bb64ddbcc372c5ed3d74190eda06f02cb3f82.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py7
-rw-r--r--synapse/storage/_base.py5
-rw-r--r--synapse/storage/roommember.py28
-rw-r--r--synapse/storage/schema/im.sql24
-rw-r--r--synapse/storage/stream.py10
5 files changed, 64 insertions, 10 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7732906927..d06033b980 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -105,6 +105,11 @@ class DataStore(RoomMemberStore, RoomStore,
             "processed": True,
         }
 
+        if hasattr(event, "outlier"):
+            vals["outlier"] = event.outlier
+        else:
+            vals["outlier"] = False
+
         if backfilled:
             if not self.min_token_deferred.called:
                 yield self.min_token_deferred
@@ -123,7 +128,7 @@ class DataStore(RoomMemberStore, RoomStore,
         except:
             logger.exception(
                 "Failed to persist, probably duplicate: %s",
-                event_id
+                event.event_id
             )
             return
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 36cc57c1b8..75aab2d3b9 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -294,6 +294,11 @@ class SQLBaseStore(object):
 
     def _parse_event_from_row(self, row_dict):
         d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+
+        d.pop("stream_ordering", None)
+        d.pop("topological_ordering", None)
+        d.pop("processed", None)
+
         d.update(json.loads(row_dict["unrecognized_keys"]))
         d["content"] = json.loads(d["content"])
         del d["unrecognized_keys"]
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 89c87290cf..86519b60c6 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,13 +35,14 @@ class RoomMemberStore(SQLBaseStore):
     def _store_room_member(self, event):
         """Store a room member in the database.
         """
-        domain = self.hs.parse_userid(event.target_user_id).domain
+        target_user_id = event.state_key
+        domain = self.hs.parse_userid(target_user_id).domain
 
         yield self._simple_insert(
             "room_memberships",
             {
                 "event_id": event.event_id,
-                "user_id": event.target_user_id,
+                "user_id": target_user_id,
                 "sender": event.user_id,
                 "room_id": event.room_id,
                 "membership": event.membership,
@@ -145,7 +146,28 @@ class RoomMemberStore(SQLBaseStore):
 
         rows = yield self._execute_and_decode(sql, *where_values)
 
-        logger.debug("_get_members_query Got rows %s", rows)
+        # logger.debug("_get_members_query Got rows %s", rows)
 
         results = [self._parse_event_from_row(r) for r in rows]
         defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def do_users_share_a_room(self, user_list):
+        """ Checks whether a list of users share a room.
+        """
+        user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_list))
+        sql = (
+            "SELECT m.room_id FROM room_memberships as m "
+            "INNER JOIN current_state_events as c "
+            "ON m.event_id = c.event_id "
+            "WHERE m.membership = 'join' "
+            "AND (%(clause)s) "
+            "GROUP BY m.room_id HAVING COUNT(m.room_id) = ?"
+        ) % {"clause": user_list_clause}
+
+        args = user_list
+        args.append(len(user_list))
+
+        rows = yield self._execute(None, sql, *args)
+
+        defer.returnValue(len(rows) > 0)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index ea04261ff0..e92f21ef3b 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -22,9 +22,15 @@ CREATE TABLE IF NOT EXISTS events(
     content TEXT NOT NULL,
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
+    outlier BOOL NOT NULL,
     CONSTRAINT ev_uniq UNIQUE (event_id)
 );
 
+CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
+CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
+CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
+CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
+
 CREATE TABLE IF NOT EXISTS state_events(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
@@ -33,6 +39,12 @@ CREATE TABLE IF NOT EXISTS state_events(
     prev_state TEXT
 );
 
+CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
+CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
+CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
+CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
+
+
 CREATE TABLE IF NOT EXISTS current_state_events(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
@@ -41,6 +53,11 @@ CREATE TABLE IF NOT EXISTS current_state_events(
     CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
 );
 
+CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
+CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
+CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
+
 CREATE TABLE IF NOT EXISTS room_memberships(
     event_id TEXT NOT NULL,
     user_id TEXT NOT NULL,
@@ -49,6 +66,10 @@ CREATE TABLE IF NOT EXISTS room_memberships(
     membership TEXT NOT NULL
 );
 
+CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
+CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
+
 CREATE TABLE IF NOT EXISTS feedback(
     event_id TEXT NOT NULL,
     feedback_type TEXT,
@@ -77,5 +98,6 @@ CREATE TABLE IF NOT EXISTS rooms(
 
 CREATE TABLE IF NOT EXISTS room_hosts(
     room_id TEXT NOT NULL,
-    host TEXT NOT NULL
+    host TEXT NOT NULL,
+    CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
 );
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index e994017bf2..3a17a723fe 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -177,6 +177,7 @@ class StreamStore(SQLBaseStore):
             "((room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering < ? "
+            "AND e.outlier = 0 "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
         ) % {
             "current": current_room_membership_sql,
@@ -224,7 +225,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT * FROM events "
-            "WHERE room_id = ? AND %(bounds)s "
+            "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
             "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
         ) % {"bounds": bounds, "order": order, "limit": limit_str}
 
@@ -249,15 +250,14 @@ class StreamStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+    def get_recent_events_for_room(self, room_id, limit, end_token,
+                                   with_feedback=False):
         # TODO (erikj): Handle compressed feedback
 
-        end_token = yield self.get_room_events_max_id()
-
         sql = (
             "SELECT * FROM events "
             "WHERE room_id = ? AND stream_ordering <= ? "
-            "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
+            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
         )
 
         rows = yield self._execute_and_decode(