diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 64d8eb2af1..3bfd5e8213 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -16,13 +16,18 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
class SlavedDeviceInboxStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
- db_conn, "device_inbox", "stream_id",
+ db_conn, "device_max_stream_id", "stream_id",
+ )
+ self._device_inbox_stream_cache = StreamChangeCache(
+ "DeviceInboxStreamChangeCache",
+ self._device_inbox_id_gen.get_current_token()
)
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
@@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
stream = result.get("to_device")
if stream:
self._device_inbox_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ stream_id = row[0]
+ user_id = row[1]
+ self._device_inbox_stream_cache.entity_has_changed(
+ user_id, stream_id
+ )
return super(SlavedDeviceInboxStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cbebd5b2f7..0c26e96e98 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -61,6 +61,9 @@ class SlavedEventStore(BaseSlavedStore):
"MembershipStreamChangeCache", events_max,
)
+ self.stream_ordering_month_ago = 0
+ self._stream_order_on_start = self.get_room_max_stream_ordering()
+
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
@@ -86,6 +89,9 @@ class SlavedEventStore(BaseSlavedStore):
_get_state_groups_from_groups = (
StateStore.__dict__["_get_state_groups_from_groups"]
)
+ _get_state_groups_from_groups_txn = (
+ DataStore._get_state_groups_from_groups_txn.__func__
+ )
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
@@ -165,6 +171,15 @@ class SlavedEventStore(BaseSlavedStore):
get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
_get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
+ get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
+
+ get_forward_extremeties_for_room = (
+ DataStore.get_forward_extremeties_for_room.__func__
+ )
+ _get_forward_extremeties_for_room = (
+ EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
+ )
+
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index d5bb0f98ea..23c613863f 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -15,7 +15,39 @@
from ._base import BaseSlavedStore
from synapse.storage import DataStore
+from ._slaved_id_tracker import SlavedIdTracker
class RoomStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(RoomStore, self).__init__(db_conn, hs)
+ self._public_room_id_gen = SlavedIdTracker(
+ db_conn, "public_room_list_stream", "stream_id"
+ )
+
get_public_room_ids = DataStore.get_public_room_ids.__func__
+ get_current_public_room_stream_id = (
+ DataStore.get_current_public_room_stream_id.__func__
+ )
+ get_public_room_ids_at_stream_id = (
+ DataStore.get_public_room_ids_at_stream_id.__func__
+ )
+ get_public_room_ids_at_stream_id_txn = (
+ DataStore.get_public_room_ids_at_stream_id_txn.__func__
+ )
+ get_published_at_stream_id_txn = (
+ DataStore.get_published_at_stream_id_txn.__func__
+ )
+ get_public_room_changes = DataStore.get_public_room_changes.__func__
+
+ def stream_positions(self):
+ result = super(RoomStore, self).stream_positions()
+ result["public_rooms"] = self._public_room_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("public_rooms")
+ if stream:
+ self._public_room_id_gen.advance(int(stream["position"]))
+
+ return super(RoomStore, self).process_replication(result)
|