summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py41
-rw-r--r--synapse/storage/deviceinbox.py6
-rw-r--r--synapse/storage/devices.py19
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/roommember.py13
-rw-r--r--synapse/storage/state.py14
-rw-r--r--synapse/storage/stream.py3
-rw-r--r--synapse/storage/util/id_generators.py14
8 files changed, 136 insertions, 23 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a7a8ec9b7b..13b106bba1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -840,6 +840,47 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
+    def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
+        return self.runInteraction(
+            desc, self._simple_delete_many_txn, table, column, iterable, keyvalues
+        )
+
+    @staticmethod
+    def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
+        """Executes a DELETE query on the named table.
+
+        Filters rows by if value of `column` is in `iterable`.
+
+        Args:
+            txn : Transaction object
+            table : string giving the table name
+            column : column name to test for inclusion against `iterable`
+            iterable : list
+            keyvalues : dict of column names and values to select the rows with
+        """
+        if not iterable:
+            return
+
+        sql = "DELETE FROM %s" % table
+
+        clauses = []
+        values = []
+        clauses.append(
+            "%s IN (%s)" % (column, ",".join("?" for _ in iterable))
+        )
+        values.extend(iterable)
+
+        for key, value in keyvalues.items():
+            clauses.append("%s = ?" % (key,))
+            values.append(value)
+
+        if clauses:
+            sql = "%s WHERE %s" % (
+                sql,
+                " AND ".join(clauses),
+            )
+        return txn.execute(sql, values)
+
     def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
                         max_value, limit=100000):
         # Fetch a mapping of room_id -> max stream position for "recent" rooms.
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 5c7db5e5f6..7925cb5f1b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore):
         """
         Args:
             destination(str): The name of the remote server.
-            last_stream_id(int): The last position of the device message stream
+            last_stream_id(int|long): The last position of the device message stream
                 that the server sent up to.
-            current_stream_id(int): The current position of the device
+            current_stream_id(int|long): The current position of the device
                 message stream.
         Returns:
-            Deferred ([dict], int): List of messages for the device and where
+            Deferred ([dict], int|long): List of messages for the device and where
                 in the stream the messages got to.
         """
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index bd56ba2515..e545b62e39 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -108,6 +108,23 @@ class DeviceStore(SQLBaseStore):
             desc="delete_device",
         )
 
+    def delete_devices(self, user_id, device_ids):
+        """Deletes several devices.
+
+        Args:
+            user_id (str): The ID of the user which owns the devices
+            device_ids (list): The IDs of the devices to delete
+        Returns:
+            defer.Deferred
+        """
+        return self._simple_delete_many(
+            table="devices",
+            column="device_id",
+            iterable=device_ids,
+            keyvalues={"user_id": user_id},
+            desc="delete_devices",
+        )
+
     def update_device(self, user_id, device_id, new_display_name=None):
         """Update a device.
 
@@ -291,7 +308,7 @@ class DeviceStore(SQLBaseStore):
         """Get stream of updates to send to remote servers
 
         Returns:
-            (now_stream_id, [ { updates }, .. ])
+            (int, list[dict]): current stream id and list of updates
         """
         now_stream_id = self._device_list_id_gen.get_current_token()
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index db01eb6d14..72319c35ae 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -433,23 +433,43 @@ class EventsStore(SQLBaseStore):
         if not new_latest_event_ids:
             current_state = {}
         elif was_updated:
+            # We work out the current state by passing the state sets to the
+            # state resolution algorithm. It may ask for some events, including
+            # the events we have yet to persist, so we need a slightly more
+            # complicated event lookup function than simply looking the events
+            # up in the db.
+            events_map = {ev.event_id: ev for ev, _ in events_context}
+
+            @defer.inlineCallbacks
+            def get_events(ev_ids):
+                # We get the events by first looking at the list of events we
+                # are trying to persist, and then fetching the rest from the DB.
+                db = []
+                to_return = {}
+                for ev_id in ev_ids:
+                    ev = events_map.get(ev_id, None)
+                    if ev:
+                        to_return[ev_id] = ev
+                    else:
+                        db.append(ev_id)
+
+                if db:
+                    evs = yield self.get_events(
+                        ev_ids, get_prev_content=False, check_redacted=False,
+                    )
+                    to_return.update(evs)
+                defer.returnValue(to_return)
+
             current_state = yield resolve_events(
                 state_sets,
-                state_map_factory=lambda ev_ids: self.get_events(
-                    ev_ids, get_prev_content=False, check_redacted=False,
-                ),
+                state_map_factory=get_events,
             )
         else:
             return
 
-        existing_state_rows = yield self._simple_select_list(
-            table="current_state_events",
-            keyvalues={"room_id": room_id},
-            retcols=["event_id", "type", "state_key"],
-            desc="_calculate_state_delta",
-        )
+        existing_state = yield self.get_current_state_ids(room_id)
 
-        existing_events = set(row["event_id"] for row in existing_state_rows)
+        existing_events = set(existing_state.itervalues())
         new_events = set(ev_id for ev_id in current_state.itervalues())
         changed_events = existing_events ^ new_events
 
@@ -457,9 +477,8 @@ class EventsStore(SQLBaseStore):
             return
 
         to_delete = {
-            (row["type"], row["state_key"]): row["event_id"]
-            for row in existing_state_rows
-            if row["event_id"] in changed_events
+            key: ev_id for key, ev_id in existing_state.iteritems()
+            if ev_id in changed_events
         }
         events_to_insert = (new_events - existing_events)
         to_insert = {
@@ -585,6 +604,10 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_users_in_room, (room_id,)
                 )
 
+                self._invalidate_cache_and_stream(
+                    txn, self.get_current_state_ids, (room_id,)
+                )
+
         for room_id, new_extrem in new_forward_extremeties.items():
             self._simple_delete_txn(
                 txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 545d3d3a99..e38d8927bf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -274,24 +274,27 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached(max_entries=500000, iterable=True)
+    @cachedInlineCallbacks(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
-        return self.get_rooms_for_user_where_membership_is(
+        """Returns a set of room_ids the user is currently joined to
+        """
+        rooms = yield self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
+        defer.returnValue(frozenset(r.room_id for r in rooms))
 
     @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
     def get_users_who_share_room_with_user(self, user_id, cache_context):
         """Returns the set of users who share a room with `user_id`
         """
-        rooms = yield self.get_rooms_for_user(
+        room_ids = yield self.get_rooms_for_user(
             user_id, on_invalidate=cache_context.invalidate,
         )
 
         user_who_share_room = set()
-        for room in rooms:
+        for room_id in room_ids:
             user_ids = yield self.get_users_in_room(
-                room.room_id, on_invalidate=cache_context.invalidate,
+                room_id, on_invalidate=cache_context.invalidate,
             )
             user_who_share_room.update(user_ids)
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 84482d8285..27f1ec89ec 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 from synapse.util.caches import intern_string
 from synapse.storage.engines import PostgresEngine
 
@@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
             where_clause="type='m.room.member'",
         )
 
+    @cachedInlineCallbacks(max_entries=100000, iterable=True)
+    def get_current_state_ids(self, room_id):
+        rows = yield self._simple_select_list(
+            table="current_state_events",
+            keyvalues={"room_id": room_id},
+            retcols=["event_id", "type", "state_key"],
+            desc="_calculate_state_delta",
+        )
+        defer.returnValue({
+            (r["type"], r["state_key"]): r["event_id"] for r in rows
+        })
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 200d124632..dddd5fc0e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore):
             updatevalues={"stream_id": stream_id},
             desc="update_federation_out_pos",
         )
+
+    def has_room_changed_since(self, room_id, stream_id):
+        return self._events_stream_cache.has_entity_changed(room_id, stream_id)
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 46cf93ff87..95031dc9ec 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,6 +30,17 @@ class IdGenerator(object):
 
 
 def _load_current_id(db_conn, table, column, step=1):
+    """
+
+    Args:
+        db_conn (object):
+        table (str):
+        column (str):
+        step (int):
+
+    Returns:
+        int
+    """
     cur = db_conn.cursor()
     if step == 1:
         cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
@@ -131,6 +142,9 @@ class StreamIdGenerator(object):
     def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
+
+        Returns:
+            int
         """
         with self._lock:
             if self._unfinished_ids: