summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py2
-rw-r--r--synapse/replication/http/federation.py49
-rw-r--r--synapse/replication/http/send_event.py14
-rw-r--r--synapse/replication/slave/storage/events.py20
-rw-r--r--synapse/replication/tcp/resource.py6
-rw-r--r--synapse/replication/tcp/streams/_base.py2
6 files changed, 81 insertions, 12 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 444eb7b7f4..1be1ccbdf3 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -44,7 +44,7 @@ class ReplicationEndpoint(object):
     """Helper base class for defining new replication HTTP endpoints.
 
     This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
-    (with an `/:txn_id` prefix for cached requests.), where NAME is a name,
+    (with a `/:txn_id` suffix for cached requests), where NAME is a name,
     PATH_ARGS are a tuple of parameters to be encoded in the URL.
 
     For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 49a3251372..7e23b565b9 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,7 +17,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.events import event_type_from_format_version
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
@@ -37,6 +38,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         {
             "events": [{
                 "event": { .. serialized event .. },
+                "room_version": .., // "1", "2", "3", etc: the version of the room
+                                    // containing the event
+                "event_format_version": .., // 1,2,3 etc: the event format version
                 "internal_metadata": { .. serialized internal_metadata .. },
                 "rejected_reason": ..,   // The event.rejected_reason field
                 "context": { .. serialized event context .. },
@@ -72,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
             event_payloads.append(
                 {
                     "event": event.get_pdu_json(),
+                    "room_version": event.room_version.identifier,
                     "event_format_version": event.format_version,
                     "internal_metadata": event.internal_metadata.get_dict(),
                     "rejected_reason": event.rejected_reason,
@@ -94,12 +99,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
             event_and_contexts = []
             for event_payload in event_payloads:
                 event_dict = event_payload["event"]
-                format_ver = event_payload["event_format_version"]
+                room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
                 internal_metadata = event_payload["internal_metadata"]
                 rejected_reason = event_payload["rejected_reason"]
 
-                EventType = event_type_from_format_version(format_ver)
-                event = EventType(event_dict, internal_metadata, rejected_reason)
+                event = make_event_from_dict(
+                    event_dict, room_ver, internal_metadata, rejected_reason
+                )
 
                 context = EventContext.deserialize(
                     self.storage, event_payload["context"]
@@ -211,7 +217,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
 
     Request format:
 
-        POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+        POST /_synapse/replication/fed_cleanup_room/:room_id/:txn_id
 
         {}
     """
@@ -238,8 +244,41 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
+class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
+    """Called to clean up any data in DB for a given room, ready for the
+    server to join the room.
+
+    Request format:
+
+        POST /_synapse/replication/store_room_on_invite/:room_id/:txn_id
+
+        {
+            "room_version": "1",
+        }
+    """
+
+    NAME = "store_room_on_invite"
+    PATH_ARGS = ("room_id",)
+
+    def __init__(self, hs):
+        super().__init__(hs)
+
+        self.store = hs.get_datastore()
+
+    @staticmethod
+    def _serialize_payload(room_id, room_version):
+        return {"room_version": room_version.identifier}
+
+    async def _handle_request(self, request, room_id):
+        content = parse_json_object_from_request(request)
+        room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
+        await self.store.maybe_store_room_on_invite(room_id, room_version)
+        return 200, {}
+
+
 def register_servlets(hs, http_server):
     ReplicationFederationSendEventsRestServlet(hs).register(http_server)
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
     ReplicationGetQueryRestServlet(hs).register(http_server)
     ReplicationCleanRoomRestServlet(hs).register(http_server)
+    ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 84b92f16ad..b74b088ff4 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -17,7 +17,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.events import event_type_from_format_version
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
@@ -37,6 +38,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
         {
             "event": { .. serialized event .. },
+            "room_version": .., // "1", "2", "3", etc: the version of the room
+                                // containing the event
+            "event_format_version": .., // 1,2,3 etc: the event format version
             "internal_metadata": { .. serialized internal_metadata .. },
             "rejected_reason": ..,   // The event.rejected_reason field
             "context": { .. serialized event context .. },
@@ -77,6 +81,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
         payload = {
             "event": event.get_pdu_json(),
+            "room_version": event.room_version.identifier,
             "event_format_version": event.format_version,
             "internal_metadata": event.internal_metadata.get_dict(),
             "rejected_reason": event.rejected_reason,
@@ -93,12 +98,13 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             content = parse_json_object_from_request(request)
 
             event_dict = content["event"]
-            format_ver = content["event_format_version"]
+            room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
             internal_metadata = content["internal_metadata"]
             rejected_reason = content["rejected_reason"]
 
-            EventType = event_type_from_format_version(format_ver)
-            event = EventType(event_dict, internal_metadata, rejected_reason)
+            event = make_event_from_dict(
+                event_dict, room_ver, internal_metadata, rejected_reason
+            )
 
             requester = Requester.deserialize(self.store, content["requester"])
             context = EventContext.deserialize(self.storage, content["context"])
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 3aa6cb8b96..e73342c657 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -32,6 +32,7 @@ from synapse.storage.data_stores.main.state import StateGroupWorkerStore
 from synapse.storage.data_stores.main.stream import StreamWorkerStore
 from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore
 from synapse.storage.database import Database
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
@@ -68,6 +69,21 @@ class SlavedEventStore(
 
         super(SlavedEventStore, self).__init__(database, db_conn, hs)
 
+        events_max = self._stream_id_gen.get_current_token()
+        curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
+            db_conn,
+            "current_state_delta_stream",
+            entity_column="room_id",
+            stream_column="stream_id",
+            max_value=events_max,  # As we share the stream id with events token
+            limit=1000,
+        )
+        self._curr_state_delta_stream_cache = StreamChangeCache(
+            "_curr_state_delta_stream_cache",
+            min_curr_state_delta_id,
+            prefilled_cache=curr_state_delta_prefill,
+        )
+
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
 
@@ -120,6 +136,10 @@ class SlavedEventStore(
                 backfilled=False,
             )
         elif row.type == EventsStreamCurrentStateRow.TypeId:
+            self._curr_state_delta_stream_cache.entity_has_changed(
+                row.data.room_id, token
+            )
+
             if data.type == EventTypes.Member:
                 self.get_rooms_for_user_with_stream_ordering.invalidate(
                     (data.state_key,)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index ce60ae2e07..ce9d1fae12 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -323,7 +323,11 @@ class ReplicationStreamer(object):
 
         # We need to tell the presence handler that the connection has been
         # lost so that it can handle any ongoing syncs on that connection.
-        self.presence_handler.update_external_syncs_clear(connection.conn_id)
+        run_as_background_process(
+            "update_external_syncs_clear",
+            self.presence_handler.update_external_syncs_clear,
+            connection.conn_id,
+        )
 
 
 def _batch_updates(updates):
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index a8d568b14a..208e8a667b 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -24,7 +24,7 @@ import attr
 logger = logging.getLogger(__name__)
 
 
-MAX_EVENTS_BEHIND = 10000
+MAX_EVENTS_BEHIND = 500000
 
 BackfillStreamRow = namedtuple(
     "BackfillStreamRow",