summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/receipts.py67
-rw-r--r--synapse/storage/stream.py157
3 files changed, 160 insertions, 67 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 298cb9bada..80187722ea 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -128,6 +128,9 @@ class EventsStore(SQLBaseStore):
                     is_new_state=is_new_state,
                     current_state=current_state,
                 )
+                self._events_stream_cache.room_has_changed(
+                    None, event.room_id, stream_ordering
+                )
         except _RollbackButIsFineException:
             pass
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c0593e23ee..7118368d97 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -15,11 +15,10 @@
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
 
 from twisted.internet import defer
 
-from blist import sorteddict
 import logging
 import ujson as json
 
@@ -31,8 +30,8 @@ class ReceiptsStore(SQLBaseStore):
     def __init__(self, hs):
         super(ReceiptsStore, self).__init__(hs)
 
-        self._receipts_stream_cache = _RoomStreamChangeCache(
-            self._receipts_id_gen.get_max_token(None)
+        self._receipts_stream_cache = RoomStreamChangeCache(
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
         )
 
     @cached(num_args=2)
@@ -370,63 +369,3 @@ class ReceiptsStore(SQLBaseStore):
                 "data": json.dumps(data),
             }
         )
-
-
-class _RoomStreamChangeCache(object):
-    """Keeps track of the stream_id of the latest change in rooms.
-
-    Given a list of rooms and stream key, it will give a subset of rooms that
-    may have changed since that key. If the key is too old then the cache
-    will simply return all rooms.
-    """
-    def __init__(self, current_key, size_of_cache=10000):
-        self._size_of_cache = size_of_cache
-        self._room_to_key = {}
-        self._cache = sorteddict()
-        self._earliest_key = current_key
-        self.name = "ReceiptsRoomChangeCache"
-        caches_by_name[self.name] = self._cache
-
-    @defer.inlineCallbacks
-    def get_rooms_changed(self, store, room_ids, key):
-        """Returns subset of room ids that have had new receipts since the
-        given key. If the key is too old it will just return the given list.
-        """
-        if key > (yield self._get_earliest_key(store)):
-            keys = self._cache.keys()
-            i = keys.bisect_right(key)
-
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(room_ids)
-
-            cache_counter.inc_hits(self.name)
-        else:
-            result = room_ids
-            cache_counter.inc_misses(self.name)
-
-        defer.returnValue(result)
-
-    @defer.inlineCallbacks
-    def room_has_changed(self, store, room_id, key):
-        """Informs the cache that the room has been changed at the given key.
-        """
-        if key > (yield self._get_earliest_key(store)):
-            old_key = self._room_to_key.get(room_id, None)
-            if old_key:
-                key = max(key, old_key)
-                self._cache.pop(old_key, None)
-            self._cache[key] = room_id
-
-            while len(self._cache) > self._size_of_cache:
-                k, r = self._cache.popitem()
-                self._earliest_key = max(k, self._earliest_key)
-                self._room_to_key.pop(r, None)
-
-    @defer.inlineCallbacks
-    def _get_earliest_key(self, store):
-        if self._earliest_key is None:
-            self._earliest_key = yield store.get_max_receipt_stream_id()
-            self._earliest_key = int(self._earliest_key)
-
-        defer.returnValue(self._earliest_key)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index e31bad258a..5096b46864 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,6 +37,7 @@ from twisted.internet import defer
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logutils import log_function
@@ -77,6 +78,12 @@ def upper_bound(token):
 
 
 class StreamStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(StreamStore, self).__init__(hs)
+
+        self._events_stream_cache = RoomStreamChangeCache(
+            "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
+        )
 
     @defer.inlineCallbacks
     def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
@@ -157,6 +164,134 @@ class StreamStore(SQLBaseStore):
         results = yield self.runInteraction("get_appservice_room_stream", f)
         defer.returnValue(results)
 
+    @defer.inlineCallbacks
+    def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
+
+        room_ids = yield self._events_stream_cache.get_rooms_changed(
+            self, room_ids, from_id
+        )
+
+        if not room_ids:
+            defer.returnValue({})
+
+        results = {}
+        room_ids = list(room_ids)
+        for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+            res = yield defer.gatherResults([
+                self.get_room_events_stream_for_room(
+                    room_id, from_key, to_key, limit
+                ).addCallback(lambda r, rm: (rm, r), room_id)
+                for room_id in room_ids
+            ])
+            results.update(dict(res))
+
+        defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
+        if from_key is not None:
+            from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        else:
+            from_id = None
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+        if from_key == to_key:
+            defer.returnValue(([], from_key))
+
+        has_changed = yield self._events_stream_cache.get_room_has_changed(
+            room_id, from_id
+        )
+
+        if not has_changed:
+            defer.returnValue(([], from_key))
+
+        def f(txn):
+            if from_id is not None:
+                sql = (
+                    "SELECT event_id, stream_ordering FROM events WHERE"
+                    " room_id = ?"
+                    " AND not outlier"
+                    " AND stream_ordering > ? AND stream_ordering <= ?"
+                    " ORDER BY stream_ordering DESC LIMIT ?"
+                )
+                txn.execute(sql, (room_id, from_id, to_id, limit))
+            else:
+                sql = (
+                    "SELECT event_id, stream_ordering FROM events WHERE"
+                    " room_id = ?"
+                    " AND not outlier"
+                    " AND stream_ordering <= ?"
+                    " ORDER BY stream_ordering DESC LIMIT ?"
+                )
+                txn.execute(sql, (room_id, to_id, limit))
+
+            rows = self.cursor_to_dict(txn)
+
+            ret = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
+
+            self._set_before_and_after(ret, rows, topo_order=False)
+
+            ret.reverse()
+
+            if rows:
+                key = "s%d" % min(r["stream_ordering"] for r in rows)
+            else:
+                # Assume we didn't get anything because there was nothing to
+                # get.
+                key = from_key
+
+            return ret, key
+        res = yield self.runInteraction("get_room_events_stream_for_room", f)
+        defer.returnValue(res)
+
+    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        if from_key is not None:
+            from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        else:
+            from_id = None
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+        if from_key == to_key:
+            return defer.succeed([])
+
+        def f(txn):
+            if from_id is not None:
+                sql = (
+                    "SELECT m.event_id, stream_ordering FROM events AS e,"
+                    " room_memberships AS m"
+                    " WHERE e.event_id = m.event_id"
+                    " AND m.user_id = ?"
+                    " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+                    " ORDER BY e.stream_ordering ASC"
+                )
+                txn.execute(sql, (user_id, from_id, to_id,))
+            else:
+                sql = (
+                    "SELECT m.event_id, stream_ordering FROM events AS e,"
+                    " room_memberships AS m"
+                    " WHERE e.event_id = m.event_id"
+                    " AND m.user_id = ?"
+                    " AND stream_ordering <= ?"
+                    " ORDER BY stream_ordering ASC"
+                )
+                txn.execute(sql, (user_id, to_id,))
+            rows = self.cursor_to_dict(txn)
+
+            ret = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
+
+            return ret
+
+        return self.runInteraction("get_room_changes_for_user", f)
+
     @log_function
     def get_room_events_stream(
         self,
@@ -174,7 +309,8 @@ class StreamStore(SQLBaseStore):
                 "SELECT c.room_id FROM history_visibility AS h"
                 " INNER JOIN current_state_events AS c"
                 " ON h.event_id = c.event_id"
-                " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % (
+                " WHERE c.room_id IN (%s)"
+                " AND h.history_visibility = 'world_readable'" % (
                     ",".join(map(lambda _: "?", room_ids))
                 )
             )
@@ -434,6 +570,18 @@ class StreamStore(SQLBaseStore):
             row["topological_ordering"], row["stream_ordering"],)
         )
 
+    def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
+        sql = (
+            "SELECT max(topological_ordering) FROM events"
+            " WHERE room_id = ? AND stream_ordering < ?"
+        )
+        return self._execute(
+            "get_max_topological_token_for_stream_and_room", None,
+            sql, room_id, stream_key,
+        ).addCallback(
+            lambda r: r[0][0] if r else 0
+        )
+
     def _get_max_topological_txn(self, txn):
         txn.execute(
             "SELECT MAX(topological_ordering) FROM events"
@@ -445,10 +593,13 @@ class StreamStore(SQLBaseStore):
         return rows[0][0] if rows else 0
 
     @staticmethod
-    def _set_before_and_after(events, rows):
+    def _set_before_and_after(events, rows, topo_order=True):
         for event, row in zip(events, rows):
             stream = row["stream_ordering"]
-            topo = event.depth
+            if topo_order:
+                topo = event.depth
+            else:
+                topo = None
             internal = event.internal_metadata
             internal.before = str(RoomStreamToken(topo, stream - 1))
             internal.after = str(RoomStreamToken(topo, stream))