summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/synchrotron.py12
-rw-r--r--synapse/replication/resource.py4
-rw-r--r--synapse/replication/slave/storage/events.py44
-rw-r--r--synapse/storage/events.py31
-rw-r--r--tests/replication/test_resource.py2
5 files changed, 39 insertions, 54 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index a68bb873fe..34e34e5580 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
@@ -411,11 +410,16 @@ class SynchrotronServer(HomeServer):
             stream = result.get("events")
             if stream:
                 max_position = stream["position"]
+
+                event_map = yield store.get_events([row[1] for row in stream["rows"]])
+
                 for row in stream["rows"]:
                     position = row[0]
-                    internal = json.loads(row[1])
-                    event_json = json.loads(row[2])
-                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    event_id = row[1]
+                    event = event_map.get(event_id, None)
+                    if not event:
+                        continue
+
                     extra_users = ()
                     if event.type == EventTypes.Member:
                         extra_users = (event.state_key,)
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index d8eb14592b..03930fe958 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -283,12 +283,12 @@ class ReplicationResource(Resource):
 
             if request_events != upto_events_token:
                 writer.write_header_and_rows("events", res.new_forward_events, (
-                    "position", "internal", "json", "state_group"
+                    "position", "event_id", "room_id", "type", "state_key",
                 ), position=upto_events_token)
 
             if request_backfill != upto_backfill_token:
                 writer.write_header_and_rows("backfill", res.new_backfill_events, (
-                    "position", "internal", "json", "state_group",
+                    "position", "event_id", "room_id", "type", "state_key", "redacts",
                 ), position=upto_backfill_token)
 
             writer.write_header_and_rows(
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 518c9ea2e9..a1e1e54e5b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,7 +16,6 @@ from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 from synapse.api.constants import EventTypes
-from synapse.events import FrozenEvent
 from synapse.storage import DataStore
 from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
@@ -25,7 +24,6 @@ from synapse.storage.state import StateStore
 from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-import ujson as json
 import logging
 
 
@@ -242,46 +240,32 @@ class SlavedEventStore(BaseSlavedStore):
         return super(SlavedEventStore, self).process_replication(result)
 
     def _process_replication_row(self, row, backfilled):
-        internal = json.loads(row[1])
-        event_json = json.loads(row[2])
-        event = FrozenEvent(event_json, internal_metadata_dict=internal)
+        stream_ordering = row[0] if not backfilled else -row[0]
         self.invalidate_caches_for_event(
-            event, backfilled,
+            stream_ordering, row[1], row[2], row[3], row[4], row[5],
+            backfilled=backfilled,
         )
 
-    def invalidate_caches_for_event(self, event, backfilled):
-        self._invalidate_get_event_cache(event.event_id)
+    def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
+                                    etype, state_key, redacts, backfilled):
+        self._invalidate_get_event_cache(event_id)
 
-        self.get_latest_event_ids_in_room.invalidate((event.room_id,))
+        self.get_latest_event_ids_in_room.invalidate((room_id,))
 
         self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-            (event.room_id,)
+            (room_id,)
         )
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(
-                event.room_id, event.internal_metadata.stream_ordering
+                room_id, stream_ordering
             )
 
-        # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-        #     (event.room_id,)
-        # )
+        if redacts:
+            self._invalidate_get_event_cache(redacts)
 
-        if event.type == EventTypes.Redaction:
-            self._invalidate_get_event_cache(event.redacts)
-
-        if event.type == EventTypes.Member:
+        if etype == EventTypes.Member:
             self._membership_stream_cache.entity_has_changed(
-                event.state_key, event.internal_metadata.stream_ordering
+                state_key, stream_ordering
             )
-            self.get_invited_rooms_for_user.invalidate((event.state_key,))
-
-        if not event.is_state():
-            return
-
-        if backfilled:
-            return
-
-        if (not event.internal_metadata.is_invite_from_remote()
-                and event.internal_metadata.is_outlier()):
-            return
+            self.get_invited_rooms_for_user.invalidate((state_key,))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4d3cfc336f..3c8393bfe8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1773,14 +1773,13 @@ class EventsStore(SQLBaseStore):
 
         def get_all_new_events_txn(txn):
             sql = (
-                "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
-                " FROM events as e"
-                " JOIN event_json as ej"
-                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
-                " LEFT JOIN event_to_state_groups as eg"
-                " ON e.event_id = eg.event_id"
-                " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
-                " ORDER BY e.stream_ordering ASC"
+                "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? < stream_ordering AND stream_ordering <= ?"
+                " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
             )
             if have_forward_events:
@@ -1806,15 +1805,13 @@ class EventsStore(SQLBaseStore):
                 forward_ex_outliers = []
 
             sql = (
-                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
-                " eg.state_group"
-                " FROM events as e"
-                " JOIN event_json as ej"
-                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
-                " LEFT JOIN event_to_state_groups as eg"
-                " ON e.event_id = eg.event_id"
-                " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
-                " ORDER BY e.stream_ordering DESC"
+                "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? > stream_ordering AND stream_ordering >= ?"
+                " ORDER BY stream_ordering DESC"
                 " LIMIT ?"
             )
             if have_backfill_events:
diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py
index 93b9fad012..429b37e360 100644
--- a/tests/replication/test_resource.py
+++ b/tests/replication/test_resource.py
@@ -68,7 +68,7 @@ class ReplicationResourceCase(unittest.TestCase):
         code, body = yield get
         self.assertEquals(code, 200)
         self.assertEquals(body["events"]["field_names"], [
-            "position", "internal", "json", "state_group"
+            "position", "event_id", "room_id", "type", "state_key",
         ])
 
     @defer.inlineCallbacks