diff options
author | Erik Johnston <erik@matrix.org> | 2017-04-11 11:12:37 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-04-11 11:12:37 +0100 |
commit | 4902db1fc978d1c9a3681720cffc4dbb9d72dbea (patch) | |
tree | ee78b86f62119a7605ab2fdd1bc8afd3953e7978 /synapse/replication | |
parent | Merge branch 'release-v0.19.3' of github.com:matrix-org/synapse (diff) | |
parent | Bump changelog (diff) | |
download | synapse-4902db1fc978d1c9a3681720cffc4dbb9d72dbea.tar.xz |
Merge branch 'release-v0.20.0' of github.com:matrix-org/synapse v0.20.0
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/resource.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/_slaved_id_tracker.py | 5 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 49 | ||||
-rw-r--r-- | synapse/replication/slave/storage/presence.py | 1 |
4 files changed, 26 insertions, 33 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index d8eb14592b..03930fe958 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -283,12 +283,12 @@ class ReplicationResource(Resource): if request_events != upto_events_token: writer.write_header_and_rows("events", res.new_forward_events, ( - "position", "internal", "json", "state_group" + "position", "event_id", "room_id", "type", "state_key", ), position=upto_events_token) if request_backfill != upto_backfill_token: writer.write_header_and_rows("backfill", res.new_backfill_events, ( - "position", "internal", "json", "state_group", + "position", "event_id", "room_id", "type", "state_key", "redacts", ), position=upto_backfill_token) writer.write_header_and_rows( diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 24b5c79d4a..9d1d173b2f 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -27,4 +27,9 @@ class SlavedIdTracker(object): self._current = (max if self.step > 0 else min)(self._current, new_id) def get_current_token(self): + """ + + Returns: + int + """ return self._current diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 622b2d8540..d4db1e452e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,7 +16,6 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.api.constants import EventTypes -from synapse.events import FrozenEvent from synapse.storage import DataStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore @@ -25,7 +24,6 @@ from synapse.storage.state import StateStore from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache -import ujson as json import logging @@ -109,6 +107,10 @@ class SlavedEventStore(BaseSlavedStore): get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) + get_current_state_ids = ( + StateStore.__dict__["get_current_state_ids"] + ) + has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ @@ -165,7 +167,6 @@ class SlavedEventStore(BaseSlavedStore): _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) - _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ _get_state_for_groups = DataStore._get_state_for_groups.__func__ _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ _get_events_around_txn = DataStore._get_events_around_txn.__func__ @@ -238,46 +239,32 @@ class SlavedEventStore(BaseSlavedStore): return super(SlavedEventStore, self).process_replication(result) def _process_replication_row(self, row, backfilled): - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) + stream_ordering = row[0] if not backfilled else -row[0] self.invalidate_caches_for_event( - event, backfilled, + stream_ordering, row[1], row[2], row[3], row[4], row[5], + backfilled=backfilled, ) - def invalidate_caches_for_event(self, event, backfilled): - self._invalidate_get_event_cache(event.event_id) + def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, + etype, state_key, redacts, backfilled): + self._invalidate_get_event_cache(event_id) - self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_latest_event_ids_in_room.invalidate((room_id,)) self.get_unread_event_push_actions_by_room_for_user.invalidate_many( - (event.room_id,) + (room_id,) ) if not backfilled: self._events_stream_cache.entity_has_changed( - event.room_id, event.internal_metadata.stream_ordering + room_id, stream_ordering ) - # self.get_unread_event_push_actions_by_room_for_user.invalidate_many( - # (event.room_id,) - # ) - - if event.type == EventTypes.Redaction: - self._invalidate_get_event_cache(event.redacts) + if redacts: + self._invalidate_get_event_cache(redacts) - if event.type == EventTypes.Member: + if etype == EventTypes.Member: self._membership_stream_cache.entity_has_changed( - event.state_key, event.internal_metadata.stream_ordering + state_key, stream_ordering ) - self.get_invited_rooms_for_user.invalidate((event.state_key,)) - - if not event.is_state(): - return - - if backfilled: - return - - if (not event.internal_metadata.is_invite_from_remote() - and event.internal_metadata.is_outlier()): - return + self.get_invited_rooms_for_user.invalidate((state_key,)) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 40f6c9a386..e4a2414d78 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -57,5 +57,6 @@ class SlavedPresenceStore(BaseSlavedStore): self.presence_stream_cache.entity_has_changed( user_id, position ) + self._get_presence_for_user.invalidate((user_id,)) return super(SlavedPresenceStore, self).process_replication(result) |