diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/slave/storage/devices.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 5 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/streams.py | 22 |
4 files changed, 29 insertions, 3 deletions
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 4d4a435471..7687867aee 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.storage.end_to_end_keys import EndToEndKeyStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -45,6 +46,7 @@ class SlavedDeviceStore(BaseSlavedStore): _mark_as_sent_devices_by_remote_txn = ( DataStore._mark_as_sent_devices_by_remote_txn.__func__ ) + count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"] def stream_positions(self): result = super(SlavedDeviceStore, self).stream_positions() diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index fcaf58b93b..94ebbffc1b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) + get_state_group_delta = StateStore.__dict__["get_state_group_delta"] + _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( @@ -151,8 +153,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ ) - is_host_joined = DataStore.is_host_joined.__func__ - _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"] + is_host_joined = RoomMemberStore.__dict__["is_host_joined"] get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 8b2c4c3043..69c46911ec 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -67,6 +67,7 @@ class ReplicationStreamer(object): self.store = hs.get_datastore() self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() + self.notifier = hs.get_notifier() # Current connections. self.connections = [] @@ -99,7 +100,7 @@ class ReplicationStreamer(object): if not hs.config.send_federation: self.federation_sender = hs.get_federation_sender() - hs.get_notifier().add_replication_callback(self.on_notifier_poke) + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 369d5f2428..fbafe12cc2 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) +CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( + "room_id", # str + "type", # str + "state_key", # str + "event_id", # str, optional +)) class Stream(object): @@ -443,6 +449,21 @@ class AccountDataStream(Stream): defer.returnValue(results) +class CurrentStateDeltaStream(Stream): + """Current state for a room was changed + """ + NAME = "current_state_deltas" + ROW_TYPE = CurrentStateDeltaStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_current_state_delta_stream_id + self.update_function = store.get_all_updated_current_state_deltas + + super(CurrentStateDeltaStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -460,5 +481,6 @@ STREAMS_MAP = { FederationStream, TagAccountDataStream, AccountDataStream, + CurrentStateDeltaStream, ) } |