diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index d8eb14592b..03930fe958 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -283,12 +283,12 @@ class ReplicationResource(Resource):
if request_events != upto_events_token:
writer.write_header_and_rows("events", res.new_forward_events, (
- "position", "internal", "json", "state_group"
+ "position", "event_id", "room_id", "type", "state_key",
), position=upto_events_token)
if request_backfill != upto_backfill_token:
writer.write_header_and_rows("backfill", res.new_backfill_events, (
- "position", "internal", "json", "state_group",
+ "position", "event_id", "room_id", "type", "state_key", "redacts",
), position=upto_backfill_token)
writer.write_header_and_rows(
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 18076e0f3b..ab133db872 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -54,7 +54,9 @@ class BaseSlavedStore(SQLBaseStore):
try:
getattr(self, cache_func).invalidate(tuple(keys))
except AttributeError:
- logger.info("Got unexpected cache_func: %r", cache_func)
+ # We probably haven't pulled in the cache in this worker,
+ # which is fine.
+ pass
self._cache_id_gen.advance(int(stream["position"]))
return defer.succeed(None)
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index 24b5c79d4a..9d1d173b2f 100644
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -27,4 +27,9 @@ class SlavedIdTracker(object):
self._current = (max if self.step > 0 else min)(self._current, new_id)
def get_current_token(self):
+ """
+
+ Returns:
+ int
+ """
return self._current
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 735c03c7eb..77c64722c7 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -46,6 +46,12 @@ class SlavedAccountDataStore(BaseSlavedStore):
)
get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+ get_tags_for_room = (
+ DataStore.get_tags_for_room.__func__
+ )
+ get_account_data_for_room = (
+ DataStore.get_account_data_for_room.__func__
+ )
get_updated_tags = DataStore.get_updated_tags.__func__
get_updated_account_data_for_user = (
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index cc860f9f9b..f9102e0d89 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -17,6 +17,7 @@ from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.caches.expiringcache import ExpiringCache
class SlavedDeviceInboxStore(BaseSlavedStore):
@@ -34,6 +35,13 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
self._device_inbox_id_gen.get_current_token()
)
+ self._last_device_delete_cache = ExpiringCache(
+ cache_name="last_device_delete_cache",
+ clock=self._clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ )
+
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d72ff6055c..d4db1e452e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,7 +16,6 @@ from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.api.constants import EventTypes
-from synapse.events import FrozenEvent
from synapse.storage import DataStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.event_federation import EventFederationStore
@@ -25,7 +24,6 @@ from synapse.storage.state import StateStore
from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-import ujson as json
import logging
@@ -85,6 +83,12 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
+ _get_unread_counts_by_receipt_txn = (
+ DataStore._get_unread_counts_by_receipt_txn.__func__
+ )
+ _get_unread_counts_by_pos_txn = (
+ DataStore._get_unread_counts_by_pos_txn.__func__
+ )
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
@@ -103,6 +107,10 @@ class SlavedEventStore(BaseSlavedStore):
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
+ get_current_state_ids = (
+ StateStore.__dict__["get_current_state_ids"]
+ )
+ has_room_changed_since = DataStore.has_room_changed_since.__func__
get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
@@ -159,7 +167,6 @@ class SlavedEventStore(BaseSlavedStore):
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
- _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
_get_state_for_groups = DataStore._get_state_for_groups.__func__
_get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
_get_events_around_txn = DataStore._get_events_around_txn.__func__
@@ -232,46 +239,32 @@ class SlavedEventStore(BaseSlavedStore):
return super(SlavedEventStore, self).process_replication(result)
def _process_replication_row(self, row, backfilled):
- internal = json.loads(row[1])
- event_json = json.loads(row[2])
- event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ stream_ordering = row[0] if not backfilled else -row[0]
self.invalidate_caches_for_event(
- event, backfilled,
+ stream_ordering, row[1], row[2], row[3], row[4], row[5],
+ backfilled=backfilled,
)
- def invalidate_caches_for_event(self, event, backfilled):
- self._invalidate_get_event_cache(event.event_id)
+ def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
+ etype, state_key, redacts, backfilled):
+ self._invalidate_get_event_cache(event_id)
- self.get_latest_event_ids_in_room.invalidate((event.room_id,))
+ self.get_latest_event_ids_in_room.invalidate((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
- (event.room_id,)
+ (room_id,)
)
if not backfilled:
self._events_stream_cache.entity_has_changed(
- event.room_id, event.internal_metadata.stream_ordering
+ room_id, stream_ordering
)
- # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
- # (event.room_id,)
- # )
-
- if event.type == EventTypes.Redaction:
- self._invalidate_get_event_cache(event.redacts)
+ if redacts:
+ self._invalidate_get_event_cache(redacts)
- if event.type == EventTypes.Member:
+ if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(
- event.state_key, event.internal_metadata.stream_ordering
+ state_key, stream_ordering
)
- self.get_invited_rooms_for_user.invalidate((event.state_key,))
-
- if not event.is_state():
- return
-
- if backfilled:
- return
-
- if (not event.internal_metadata.is_invite_from_remote()
- and event.internal_metadata.is_outlier()):
- return
+ self.get_invited_rooms_for_user.invalidate((state_key,))
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 703f4a49bf..e4a2414d78 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.storage import DataStore
+from synapse.storage.presence import PresenceStore
class SlavedPresenceStore(BaseSlavedStore):
@@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore):
_get_active_presence = DataStore._get_active_presence.__func__
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
- get_presence_for_users = DataStore.get_presence_for_users.__func__
+ _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
+ get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
@@ -55,5 +57,6 @@ class SlavedPresenceStore(BaseSlavedStore):
self.presence_stream_cache.entity_has_changed(
user_id, position
)
+ self._get_presence_for_user.invalidate((user_id,))
return super(SlavedPresenceStore, self).process_replication(result)
|