summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-05-16 10:18:53 +0100
committerErik Johnston <erik@matrix.org>2019-05-16 10:38:13 +0100
commitb5c62c6b2643a138956af0b04521540c270a3e94 (patch)
tree67752ed0fce75f373eb58553c0b2039c2f7d0fbe /synapse/replication
parentAdd cache to relations (diff)
downloadsynapse-b5c62c6b2643a138956af0b04521540c270a3e94.tar.xz
Fix relations in worker mode
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/events.py13
-rw-r--r--synapse/replication/tcp/streams/_base.py1
-rw-r--r--synapse/replication/tcp/streams/events.py11
3 files changed, 17 insertions, 8 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b457c5563f..797450bc66 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.replication.tcp.streams.events import (
 from synapse.storage.event_federation import EventFederationWorkerStore
 from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.relations import RelationsWorkerStore
 from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
@@ -52,6 +53,7 @@ class SlavedEventStore(EventFederationWorkerStore,
                        EventsWorkerStore,
                        SignatureWorkerStore,
                        UserErasureWorkerStore,
+                       RelationsWorkerStore,
                        BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
@@ -89,7 +91,7 @@ class SlavedEventStore(EventFederationWorkerStore,
             for row in rows:
                 self.invalidate_caches_for_event(
                     -token, row.event_id, row.room_id, row.type, row.state_key,
-                    row.redacts,
+                    row.redacts, row.relates_to,
                     backfilled=True,
                 )
         return super(SlavedEventStore, self).process_replication_rows(
@@ -102,7 +104,7 @@ class SlavedEventStore(EventFederationWorkerStore,
         if row.type == EventsStreamEventRow.TypeId:
             self.invalidate_caches_for_event(
                 token, data.event_id, data.room_id, data.type, data.state_key,
-                data.redacts,
+                data.redacts, data.relates_to,
                 backfilled=False,
             )
         elif row.type == EventsStreamCurrentStateRow.TypeId:
@@ -114,7 +116,8 @@ class SlavedEventStore(EventFederationWorkerStore,
             raise Exception("Unknown events stream row type %s" % (row.type, ))
 
     def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
-                                    etype, state_key, redacts, backfilled):
+                                    etype, state_key, redacts, relates_to,
+                                    backfilled):
         self._invalidate_get_event_cache(event_id)
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
@@ -136,3 +139,7 @@ class SlavedEventStore(EventFederationWorkerStore,
                 state_key, stream_ordering
             )
             self.get_invited_rooms_for_user.invalidate((state_key,))
+
+        if relates_to:
+            self.get_relations_for_event.invalidate_many((relates_to,))
+            self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 8971a6a22e..b6ce7a7bee 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -32,6 +32,7 @@ BackfillStreamRow = namedtuple("BackfillStreamRow", (
     "type",  # str
     "state_key",  # str, optional
     "redacts",  # str, optional
+    "relates_to",  # str, optional
 ))
 PresenceStreamRow = namedtuple("PresenceStreamRow", (
     "user_id",  # str
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index e0f6e29248..f1290d022a 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -80,11 +80,12 @@ class BaseEventsStreamRow(object):
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()   # str
-    room_id = attr.ib()    # str
-    type = attr.ib()       # str
-    state_key = attr.ib()  # str, optional
-    redacts = attr.ib()    # str, optional
+    event_id = attr.ib()    # str
+    room_id = attr.ib()     # str
+    type = attr.ib()        # str
+    state_key = attr.ib()   # str, optional
+    redacts = attr.ib()     # str, optional
+    relates_to = attr.ib()  # str, optional
 
 
 @attr.s(slots=True, frozen=True)