summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py10
-rw-r--r--synapse/replication/slave/storage/events.py14
-rw-r--r--synapse/replication/tcp/streams/_base.py1
-rw-r--r--synapse/replication/tcp/streams/events.py11
4 files changed, 27 insertions, 9 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index e81456ab2b..0a432a16fa 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -17,11 +17,17 @@ import abc
 import logging
 import re
 
+from six import raise_from
 from six.moves import urllib
 
 from twisted.internet import defer
 
-from synapse.api.errors import CodeMessageException, HttpResponseException
+from synapse.api.errors import (
+    CodeMessageException,
+    HttpResponseException,
+    RequestSendFailed,
+    SynapseError,
+)
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
@@ -175,6 +181,8 @@ class ReplicationEndpoint(object):
                 # on the master process that we should send to the client. (And
                 # importantly, not stack traces everywhere)
                 raise e.to_synapse_error()
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to talk to master"), e)
 
             defer.returnValue(result)
 
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b457c5563f..a3952506c1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.replication.tcp.streams.events import (
 from synapse.storage.event_federation import EventFederationWorkerStore
 from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.relations import RelationsWorkerStore
 from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
@@ -52,6 +53,7 @@ class SlavedEventStore(EventFederationWorkerStore,
                        EventsWorkerStore,
                        SignatureWorkerStore,
                        UserErasureWorkerStore,
+                       RelationsWorkerStore,
                        BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
@@ -89,7 +91,7 @@ class SlavedEventStore(EventFederationWorkerStore,
             for row in rows:
                 self.invalidate_caches_for_event(
                     -token, row.event_id, row.room_id, row.type, row.state_key,
-                    row.redacts,
+                    row.redacts, row.relates_to,
                     backfilled=True,
                 )
         return super(SlavedEventStore, self).process_replication_rows(
@@ -102,7 +104,7 @@ class SlavedEventStore(EventFederationWorkerStore,
         if row.type == EventsStreamEventRow.TypeId:
             self.invalidate_caches_for_event(
                 token, data.event_id, data.room_id, data.type, data.state_key,
-                data.redacts,
+                data.redacts, data.relates_to,
                 backfilled=False,
             )
         elif row.type == EventsStreamCurrentStateRow.TypeId:
@@ -114,7 +116,8 @@ class SlavedEventStore(EventFederationWorkerStore,
             raise Exception("Unknown events stream row type %s" % (row.type, ))
 
     def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
-                                    etype, state_key, redacts, backfilled):
+                                    etype, state_key, redacts, relates_to,
+                                    backfilled):
         self._invalidate_get_event_cache(event_id)
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
@@ -136,3 +139,8 @@ class SlavedEventStore(EventFederationWorkerStore,
                 state_key, stream_ordering
             )
             self.get_invited_rooms_for_user.invalidate((state_key,))
+
+        if relates_to:
+            self.get_relations_for_event.invalidate_many((relates_to,))
+            self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
+            self.get_applicable_edit.invalidate((relates_to,))
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 8971a6a22e..b6ce7a7bee 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -32,6 +32,7 @@ BackfillStreamRow = namedtuple("BackfillStreamRow", (
     "type",  # str
     "state_key",  # str, optional
     "redacts",  # str, optional
+    "relates_to",  # str, optional
 ))
 PresenceStreamRow = namedtuple("PresenceStreamRow", (
     "user_id",  # str
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index e0f6e29248..f1290d022a 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -80,11 +80,12 @@ class BaseEventsStreamRow(object):
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()   # str
-    room_id = attr.ib()    # str
-    type = attr.ib()       # str
-    state_key = attr.ib()  # str, optional
-    redacts = attr.ib()    # str, optional
+    event_id = attr.ib()    # str
+    room_id = attr.ib()     # str
+    type = attr.ib()        # str
+    state_key = attr.ib()   # str, optional
+    redacts = attr.ib()     # str, optional
+    relates_to = attr.ib()  # str, optional
 
 
 @attr.s(slots=True, frozen=True)