summary refs log tree commit diff
path: root/synapse/replication/slave
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/slave')
-rw-r--r--synapse/replication/slave/storage/events.py27
-rw-r--r--synapse/replication/slave/storage/keys.py21
-rw-r--r--synapse/replication/slave/storage/presence.py10
3 files changed, 26 insertions, 32 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 4830c68f35..b457c5563f 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,6 +16,10 @@
 import logging
 
 from synapse.api.constants import EventTypes
+from synapse.replication.tcp.streams.events import (
+    EventsStreamCurrentStateRow,
+    EventsStreamEventRow,
+)
 from synapse.storage.event_federation import EventFederationWorkerStore
 from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
@@ -79,11 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore,
         if stream_name == "events":
             self._stream_id_gen.advance(token)
             for row in rows:
-                self.invalidate_caches_for_event(
-                    token, row.event_id, row.room_id, row.type, row.state_key,
-                    row.redacts,
-                    backfilled=False,
-                )
+                self._process_event_stream_row(token, row)
         elif stream_name == "backfill":
             self._backfill_id_gen.advance(-token)
             for row in rows:
@@ -96,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore,
             stream_name, token, rows
         )
 
+    def _process_event_stream_row(self, token, row):
+        data = row.data
+
+        if row.type == EventsStreamEventRow.TypeId:
+            self.invalidate_caches_for_event(
+                token, data.event_id, data.room_id, data.type, data.state_key,
+                data.redacts,
+                backfilled=False,
+            )
+        elif row.type == EventsStreamCurrentStateRow.TypeId:
+            if data.type == EventTypes.Member:
+                self.get_rooms_for_user_with_stream_ordering.invalidate(
+                    (data.state_key, ),
+                )
+        else:
+            raise Exception("Unknown events stream row type %s" % (row.type, ))
+
     def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
                                     etype, state_key, redacts, backfilled):
         self._invalidate_get_event_cache(event_id)
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index 8032f53fec..cc6f7f009f 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -13,22 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import DataStore
-from synapse.storage.keys import KeyStore
+from synapse.storage import KeyStore
 
-from ._base import BaseSlavedStore, __func__
+# KeyStore isn't really safe to use from a worker, but for now we do so and hope that
+# the races it creates aren't too bad.
 
-
-class SlavedKeyStore(BaseSlavedStore):
-    _get_server_verify_key = KeyStore.__dict__[
-        "_get_server_verify_key"
-    ]
-
-    get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
-    store_server_verify_key = __func__(DataStore.store_server_verify_key)
-
-    get_server_certificate = __func__(DataStore.get_server_certificate)
-    store_server_certificate = __func__(DataStore.store_server_certificate)
-
-    get_server_keys_json = __func__(DataStore.get_server_keys_json)
-    store_server_keys_json = __func__(DataStore.store_server_keys_json)
+SlavedKeyStore = KeyStore
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 9e530defe0..0ec1db25ce 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -39,16 +39,6 @@ class SlavedPresenceStore(BaseSlavedStore):
     _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
     get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
 
-    # XXX: This is a bit broken because we don't persist the accepted list in a
-    # way that can be replicated. This means that we don't have a way to
-    # invalidate the cache correctly.
-    get_presence_list_accepted = PresenceStore.__dict__[
-        "get_presence_list_accepted"
-    ]
-    get_presence_list_observers_accepted = PresenceStore.__dict__[
-        "get_presence_list_observers_accepted"
-    ]
-
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()