summary refs log tree commit diff
path: root/synapse/replication/slave/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-03-21 13:11:15 +0000
committerGitHub <noreply@github.com>2017-03-21 13:11:15 +0000
commit37a187bfabdd6d39630a04739e822d0d1fa7daea (patch)
tree7372b8cb70c6ae240e4465363607338df3d76c96 /synapse/replication/slave/storage
parentMerge pull request #2035 from matrix-org/rav/debug_federation (diff)
parentFix unit test (diff)
downloadsynapse-37a187bfabdd6d39630a04739e822d0d1fa7daea.tar.xz
Merge pull request #2033 from matrix-org/erikj/repl_speed
Don't send the full event json over replication
Diffstat (limited to 'synapse/replication/slave/storage')
-rw-r--r--synapse/replication/slave/storage/events.py44
1 files changed, 14 insertions, 30 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 518c9ea2e9..a1e1e54e5b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,7 +16,6 @@ from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 from synapse.api.constants import EventTypes
-from synapse.events import FrozenEvent
 from synapse.storage import DataStore
 from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
@@ -25,7 +24,6 @@ from synapse.storage.state import StateStore
 from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-import ujson as json
 import logging
 
 
@@ -242,46 +240,32 @@ class SlavedEventStore(BaseSlavedStore):
         return super(SlavedEventStore, self).process_replication(result)
 
     def _process_replication_row(self, row, backfilled):
-        internal = json.loads(row[1])
-        event_json = json.loads(row[2])
-        event = FrozenEvent(event_json, internal_metadata_dict=internal)
+        stream_ordering = row[0] if not backfilled else -row[0]
         self.invalidate_caches_for_event(
-            event, backfilled,
+            stream_ordering, row[1], row[2], row[3], row[4], row[5],
+            backfilled=backfilled,
         )
 
-    def invalidate_caches_for_event(self, event, backfilled):
-        self._invalidate_get_event_cache(event.event_id)
+    def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
+                                    etype, state_key, redacts, backfilled):
+        self._invalidate_get_event_cache(event_id)
 
-        self.get_latest_event_ids_in_room.invalidate((event.room_id,))
+        self.get_latest_event_ids_in_room.invalidate((room_id,))
 
         self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-            (event.room_id,)
+            (room_id,)
         )
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(
-                event.room_id, event.internal_metadata.stream_ordering
+                room_id, stream_ordering
             )
 
-        # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-        #     (event.room_id,)
-        # )
+        if redacts:
+            self._invalidate_get_event_cache(redacts)
 
-        if event.type == EventTypes.Redaction:
-            self._invalidate_get_event_cache(event.redacts)
-
-        if event.type == EventTypes.Member:
+        if etype == EventTypes.Member:
             self._membership_stream_cache.entity_has_changed(
-                event.state_key, event.internal_metadata.stream_ordering
+                state_key, stream_ordering
             )
-            self.get_invited_rooms_for_user.invalidate((event.state_key,))
-
-        if not event.is_state():
-            return
-
-        if backfilled:
-            return
-
-        if (not event.internal_metadata.is_invite_from_remote()
-                and event.internal_metadata.is_outlier()):
-            return
+            self.get_invited_rooms_for_user.invalidate((state_key,))