diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 4d4a435471..7687867aee 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -16,6 +16,7 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
+from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -45,6 +46,7 @@ class SlavedDeviceStore(BaseSlavedStore):
_mark_as_sent_devices_by_remote_txn = (
DataStore._mark_as_sent_devices_by_remote_txn.__func__
)
+ count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]
def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index fcaf58b93b..94ebbffc1b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore):
get_current_state_ids = (
StateStore.__dict__["get_current_state_ids"]
)
+ get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
+ _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
has_room_changed_since = DataStore.has_room_changed_since.__func__
get_unread_push_actions_for_user_in_range_for_http = (
@@ -151,8 +153,7 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_rooms = (
DataStore.get_room_events_stream_for_rooms.__func__
)
- is_host_joined = DataStore.is_host_joined.__func__
- _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"]
+ is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
_set_before_and_after = staticmethod(DataStore._set_before_and_after)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8b2c4c3043..69c46911ec 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -67,6 +67,7 @@ class ReplicationStreamer(object):
self.store = hs.get_datastore()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
# Current connections.
self.connections = []
@@ -99,7 +100,7 @@ class ReplicationStreamer(object):
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
- hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+ self.notifier.add_replication_callback(self.on_notifier_poke)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 369d5f2428..fbafe12cc2 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
"data_type", # str
"data", # dict
))
+CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
+ "room_id", # str
+ "type", # str
+ "state_key", # str
+ "event_id", # str, optional
+))
class Stream(object):
@@ -443,6 +449,21 @@ class AccountDataStream(Stream):
defer.returnValue(results)
+class CurrentStateDeltaStream(Stream):
+ """Current state for a room was changed
+ """
+ NAME = "current_state_deltas"
+ ROW_TYPE = CurrentStateDeltaStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_max_current_state_delta_stream_id
+ self.update_function = store.get_all_updated_current_state_deltas
+
+ super(CurrentStateDeltaStream, self).__init__(hs)
+
+
STREAMS_MAP = {
stream.NAME: stream
for stream in (
@@ -460,5 +481,6 @@ STREAMS_MAP = {
FederationStream,
TagAccountDataStream,
AccountDataStream,
+ CurrentStateDeltaStream,
)
}
|