summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/replication/resource.py3
-rw-r--r--synapse/replication/slave/storage/events.py21
-rw-r--r--synapse/storage/event_federation.py2
-rw-r--r--synapse/storage/events.py22
-rw-r--r--synapse/storage/roommember.py6
-rw-r--r--synapse/storage/schema/delta/40/current_state_idx.sql17
-rw-r--r--synapse/storage/state.py8
-rw-r--r--tests/replication/slave/storage/test_events.py43
8 files changed, 35 insertions, 87 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index a30e647474..d8eb14592b 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -299,9 +299,6 @@ class ReplicationResource(Resource):
                 "backward_ex_outliers", res.backward_ex_outliers,
                 ("position", "event_id", "state_group"),
             )
-            writer.write_header_and_rows(
-                "state_resets", res.state_resets, ("position",),
-            )
 
     @defer.inlineCallbacks
     def presence(self, writer, current_token, request_streams):
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b3f3bf7488..15a025a019 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore):
         return result
 
     def process_replication(self, result):
-        state_resets = set(
-            r[0] for r in result.get("state_resets", {"rows": []})["rows"]
-        )
-
         stream = result.get("events")
         if stream:
             self._stream_id_gen.advance(int(stream["position"]))
@@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore):
 
             for row in stream["rows"]:
                 self._process_replication_row(
-                    row, backfilled=False, state_resets=state_resets
+                    row, backfilled=False,
                 )
 
         stream = result.get("backfill")
@@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore):
             self._backfill_id_gen.advance(-int(stream["position"]))
             for row in stream["rows"]:
                 self._process_replication_row(
-                    row, backfilled=True, state_resets=state_resets
+                    row, backfilled=True,
                 )
 
         stream = result.get("forward_ex_outliers")
@@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore):
 
         return super(SlavedEventStore, self).process_replication(result)
 
-    def _process_replication_row(self, row, backfilled, state_resets):
-        position = row[0]
+    def _process_replication_row(self, row, backfilled):
         internal = json.loads(row[1])
         event_json = json.loads(row[2])
         event = FrozenEvent(event_json, internal_metadata_dict=internal)
         self.invalidate_caches_for_event(
-            event, backfilled, reset_state=position in state_resets
+            event, backfilled,
         )
 
-    def invalidate_caches_for_event(self, event, backfilled, reset_state):
-        if reset_state:
-            self.get_rooms_for_user.invalidate_all()
-            self.get_users_in_room.invalidate((event.room_id,))
-
+    def invalidate_caches_for_event(self, event, backfilled):
         self._invalidate_get_event_cache(event.event_id)
 
         self.get_latest_event_ids_in_room.invalidate((event.room_id,))
@@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore):
             self._invalidate_get_event_cache(event.redacts)
 
         if event.type == EventTypes.Member:
-            self.get_rooms_for_user.invalidate((event.state_key,))
-            self.get_users_in_room.invalidate((event.room_id,))
             self._membership_stream_cache.entity_has_changed(
                 event.state_key, event.internal_metadata.stream_ordering
             )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index f0aa2193fb..ee88c61954 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -129,7 +129,7 @@ class EventFederationStore(SQLBaseStore):
             room_id,
         )
 
-    @cached()
+    @cached(max_entries=5000, iterable=True)
     def get_latest_event_ids_in_room(self, room_id):
         return self._simple_select_onecol(
             table="event_forward_extremities",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6685b9da1c..8659f605a5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -28,6 +28,7 @@ from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.state import resolve_events
+from synapse.util.caches.descriptors import cached
 
 from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple, OrderedDict
@@ -572,14 +573,6 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_users_in_room, (room_id,)
                 )
 
-                # Add an entry to the current_state_resets table to record the point
-                # where we clobbered the current state
-                self._simple_insert_txn(
-                    txn,
-                    table="current_state_resets",
-                    values={"event_stream_ordering": max_stream_order}
-                )
-
         for room_id, new_extrem in new_forward_extremeties.items():
             self._simple_delete_txn(
                 txn,
@@ -1579,6 +1572,7 @@ class EventsStore(SQLBaseStore):
         """The current minimum token that backfilled events have reached"""
         return -self._backfill_id_gen.get_current_token()
 
+    @cached(num_args=5, max_entries=10)
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
         """Get all the new events that have arrived at the server either as
@@ -1611,15 +1605,6 @@ class EventsStore(SQLBaseStore):
                     upper_bound = current_forward_id
 
                 sql = (
-                    "SELECT event_stream_ordering FROM current_state_resets"
-                    " WHERE ? < event_stream_ordering"
-                    " AND event_stream_ordering <= ?"
-                    " ORDER BY event_stream_ordering ASC"
-                )
-                txn.execute(sql, (last_forward_id, upper_bound))
-                state_resets = txn.fetchall()
-
-                sql = (
                     "SELECT event_stream_ordering, event_id, state_group"
                     " FROM ex_outlier_stream"
                     " WHERE ? > event_stream_ordering"
@@ -1630,7 +1615,6 @@ class EventsStore(SQLBaseStore):
                 forward_ex_outliers = txn.fetchall()
             else:
                 new_forward_events = []
-                state_resets = []
                 forward_ex_outliers = []
 
             sql = (
@@ -1670,7 +1654,6 @@ class EventsStore(SQLBaseStore):
             return AllNewEventsResult(
                 new_forward_events, new_backfill_events,
                 forward_ex_outliers, backward_ex_outliers,
-                state_resets,
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
@@ -1896,5 +1879,4 @@ class EventsStore(SQLBaseStore):
 AllNewEventsResult = namedtuple("AllNewEventsResult", [
     "new_forward_events", "new_backfill_events",
     "forward_ex_outliers", "backward_ex_outliers",
-    "state_resets"
 ])
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0fdcf29085..ee800d074f 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore):
         )
 
         for event in events:
-            txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
-            txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
@@ -220,7 +218,7 @@ class RoomMemberStore(SQLBaseStore):
                 " ON e.event_id = c.event_id"
                 " AND m.room_id = c.room_id"
                 " AND m.user_id = c.state_key"
-                " WHERE %s"
+                " WHERE c.type = 'm.room.member' AND %s"
             ) % (where_clause,)
 
             txn.execute(sql, args)
@@ -266,7 +264,7 @@ class RoomMemberStore(SQLBaseStore):
             " ON m.event_id = c.event_id "
             " AND m.room_id = c.room_id "
             " AND m.user_id = c.state_key"
-            " WHERE %(where)s"
+            " WHERE c.type = 'm.room.member' AND %(where)s"
         ) % {
             "where": where_clause,
         }
diff --git a/synapse/storage/schema/delta/40/current_state_idx.sql b/synapse/storage/schema/delta/40/current_state_idx.sql
new file mode 100644
index 0000000000..7ffa189f39
--- /dev/null
+++ b/synapse/storage/schema/delta/40/current_state_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('current_state_members_idx', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index d1d653327c..1b3800eb6a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -49,6 +49,7 @@ class StateStore(SQLBaseStore):
 
     STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
     STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
+    CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
 
     def __init__(self, hs):
         super(StateStore, self).__init__(hs)
@@ -60,6 +61,13 @@ class StateStore(SQLBaseStore):
             self.STATE_GROUP_INDEX_UPDATE_NAME,
             self._background_index_state,
         )
+        self.register_background_index_update(
+            self.CURRENT_STATE_INDEX_UPDATE_NAME,
+            index_name="current_state_events_member_index",
+            table="current_state_events",
+            columns=["state_key"],
+            where_clause="type='m.room.member'",
+        )
 
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 6acb8ab758..105e1228bb 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -59,49 +59,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         [unpatch() for unpatch in self.unpatches]
 
     @defer.inlineCallbacks
-    def test_room_members(self):
-        yield self.persist(type="m.room.create", key="", creator=USER_ID)
-        yield self.replicate()
-        yield self.check("get_rooms_for_user", (USER_ID,), [])
-        yield self.check("get_users_in_room", (ROOM_ID,), [])
-
-        # Join the room.
-        join = yield self.persist(type="m.room.member", key=USER_ID, membership="join")
-        yield self.replicate()
-        yield self.check("get_rooms_for_user", (USER_ID,), [RoomsForUser(
-            room_id=ROOM_ID,
-            sender=USER_ID,
-            membership="join",
-            event_id=join.event_id,
-            stream_ordering=join.internal_metadata.stream_ordering,
-        )])
-        yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID])
-
-        # Leave the room.
-        yield self.persist(type="m.room.member", key=USER_ID, membership="leave")
-        yield self.replicate()
-        yield self.check("get_rooms_for_user", (USER_ID,), [])
-        yield self.check("get_users_in_room", (ROOM_ID,), [])
-
-        # Add some other user to the room.
-        join = yield self.persist(type="m.room.member", key=USER_ID_2, membership="join")
-        yield self.replicate()
-        yield self.check("get_rooms_for_user", (USER_ID_2,), [RoomsForUser(
-            room_id=ROOM_ID,
-            sender=USER_ID,
-            membership="join",
-            event_id=join.event_id,
-            stream_ordering=join.internal_metadata.stream_ordering,
-        )])
-        yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2])
-
-        yield self.persist(
-            type="m.room.member", key=USER_ID, membership="join",
-        )
-        yield self.replicate()
-        yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2, USER_ID])
-
-    @defer.inlineCallbacks
     def test_get_latest_event_ids_in_room(self):
         create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
         yield self.replicate()