diff options
author | Erik Johnston <erikj@jki.re> | 2017-03-21 13:11:15 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-03-21 13:11:15 +0000 |
commit | 37a187bfabdd6d39630a04739e822d0d1fa7daea (patch) | |
tree | 7372b8cb70c6ae240e4465363607338df3d76c96 | |
parent | Merge pull request #2035 from matrix-org/rav/debug_federation (diff) | |
parent | Fix unit test (diff) | |
download | synapse-37a187bfabdd6d39630a04739e822d0d1fa7daea.tar.xz |
Merge pull request #2033 from matrix-org/erikj/repl_speed
Don't send the full event json over replication
-rw-r--r-- | synapse/app/synchrotron.py | 12 | ||||
-rw-r--r-- | synapse/replication/resource.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 44 | ||||
-rw-r--r-- | synapse/storage/events.py | 31 | ||||
-rw-r--r-- | tests/replication/test_resource.py | 2 |
5 files changed, 39 insertions, 54 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a68bb873fe..34e34e5580 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite from synapse.http.server import JsonResource @@ -411,11 +410,16 @@ class SynchrotronServer(HomeServer): stream = result.get("events") if stream: max_position = stream["position"] + + event_map = yield store.get_events([row[1] for row in stream["rows"]]) + for row in stream["rows"]: position = row[0] - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) + event_id = row[1] + event = event_map.get(event_id, None) + if not event: + continue + extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index d8eb14592b..03930fe958 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -283,12 +283,12 @@ class ReplicationResource(Resource): if request_events != upto_events_token: writer.write_header_and_rows("events", res.new_forward_events, ( - "position", "internal", "json", "state_group" + "position", "event_id", "room_id", "type", "state_key", ), position=upto_events_token) if request_backfill != upto_backfill_token: writer.write_header_and_rows("backfill", res.new_backfill_events, ( - "position", "internal", "json", "state_group", + "position", "event_id", "room_id", "type", "state_key", "redacts", ), position=upto_backfill_token) writer.write_header_and_rows( diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 518c9ea2e9..a1e1e54e5b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,7 +16,6 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.api.constants import EventTypes -from synapse.events import FrozenEvent from synapse.storage import DataStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore @@ -25,7 +24,6 @@ from synapse.storage.state import StateStore from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache -import ujson as json import logging @@ -242,46 +240,32 @@ class SlavedEventStore(BaseSlavedStore): return super(SlavedEventStore, self).process_replication(result) def _process_replication_row(self, row, backfilled): - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) + stream_ordering = row[0] if not backfilled else -row[0] self.invalidate_caches_for_event( - event, backfilled, + stream_ordering, row[1], row[2], row[3], row[4], row[5], + backfilled=backfilled, ) - def invalidate_caches_for_event(self, event, backfilled): - self._invalidate_get_event_cache(event.event_id) + def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, + etype, state_key, redacts, backfilled): + self._invalidate_get_event_cache(event_id) - self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_latest_event_ids_in_room.invalidate((room_id,)) self.get_unread_event_push_actions_by_room_for_user.invalidate_many( - (event.room_id,) + (room_id,) ) if not backfilled: self._events_stream_cache.entity_has_changed( - event.room_id, event.internal_metadata.stream_ordering + room_id, stream_ordering ) - # self.get_unread_event_push_actions_by_room_for_user.invalidate_many( - # (event.room_id,) - # ) + if redacts: + self._invalidate_get_event_cache(redacts) - if event.type == EventTypes.Redaction: - self._invalidate_get_event_cache(event.redacts) - - if event.type == EventTypes.Member: + if etype == EventTypes.Member: self._membership_stream_cache.entity_has_changed( - event.state_key, event.internal_metadata.stream_ordering + state_key, stream_ordering ) - self.get_invited_rooms_for_user.invalidate((event.state_key,)) - - if not event.is_state(): - return - - if backfilled: - return - - if (not event.internal_metadata.is_invite_from_remote() - and event.internal_metadata.is_outlier()): - return + self.get_invited_rooms_for_user.invalidate((state_key,)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4d3cfc336f..3c8393bfe8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1773,14 +1773,13 @@ class EventsStore(SQLBaseStore): def get_all_new_events_txn(txn): sql = ( - "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group" - " FROM events as e" - " JOIN event_json as ej" - " ON e.event_id = ej.event_id AND e.room_id = ej.room_id" - " LEFT JOIN event_to_state_groups as eg" - " ON e.event_id = eg.event_id" - " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?" - " ORDER BY e.stream_ordering ASC" + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" " LIMIT ?" ) if have_forward_events: @@ -1806,15 +1805,13 @@ class EventsStore(SQLBaseStore): forward_ex_outliers = [] sql = ( - "SELECT -e.stream_ordering, ej.internal_metadata, ej.json," - " eg.state_group" - " FROM events as e" - " JOIN event_json as ej" - " ON e.event_id = ej.event_id AND e.room_id = ej.room_id" - " LEFT JOIN event_to_state_groups as eg" - " ON e.event_id = eg.event_id" - " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?" - " ORDER BY e.stream_ordering DESC" + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " ORDER BY stream_ordering DESC" " LIMIT ?" ) if have_backfill_events: diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 93b9fad012..429b37e360 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -68,7 +68,7 @@ class ReplicationResourceCase(unittest.TestCase): code, body = yield get self.assertEquals(code, 200) self.assertEquals(body["events"]["field_names"], [ - "position", "internal", "json", "state_group" + "position", "event_id", "room_id", "type", "state_key", ]) @defer.inlineCallbacks |