diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index ff78c60f13..0e983ae7fa 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -159,6 +159,15 @@ class ReplicationResource(Resource):
result = yield self.notifier.wait_for_replication(replicate, timeout)
+ for stream_name, stream_content in result.items():
+ logger.info(
+ "Replicating %d rows of %s from %s -> %s",
+ len(stream_content["rows"]),
+ stream_name,
+ request_streams.get(stream_name),
+ stream_content["position"],
+ )
+
request.write(json.dumps(result, ensure_ascii=False))
finish_request(request)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
new file mode 100644
index 0000000000..f59b0eabbc
--- /dev/null
+++ b/synapse/replication/slave/storage/account_data.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.account_data import AccountDataStore
+
+
+class SlavedAccountDataStore(BaseSlavedStore):
+
+ def __init__(self, db_conn, hs):
+ super(SlavedAccountDataStore, self).__init__(db_conn, hs)
+ self._account_data_id_gen = SlavedIdTracker(
+ db_conn, "account_data_max_stream_id", "stream_id",
+ )
+
+ get_global_account_data_by_type_for_users = (
+ AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
+ )
+
+ get_global_account_data_by_type_for_user = (
+ AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
+ )
+
+ def stream_positions(self):
+ result = super(SlavedAccountDataStore, self).stream_positions()
+ position = self._account_data_id_gen.get_current_token()
+ result["user_account_data"] = position
+ result["room_account_data"] = position
+ result["tag_account_data"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("user_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ user_id, data_type = row[1:3]
+ self.get_global_account_data_by_type_for_user.invalidate(
+ (data_type, user_id,)
+ )
+
+ stream = result.get("room_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+
+ stream = result.get("tag_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 86f00b6ff5..c0d741452d 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
+ _get_state_group_for_events = (
+ StateStore.__dict__["_get_state_group_for_events"]
+ )
+ _get_state_group_for_event = (
+ StateStore.__dict__["_get_state_group_for_event"]
+ )
+ _get_state_groups_from_groups = (
+ StateStore.__dict__["_get_state_groups_from_groups"]
+ )
+ _get_state_group_from_group = (
+ StateStore.__dict__["_get_state_group_from_group"]
+ )
get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -83,6 +95,7 @@ class SlavedEventStore(BaseSlavedStore):
DataStore.get_push_action_users_in_range.__func__
)
get_event = DataStore.get_event.__func__
+ get_events = DataStore.get_events.__func__
get_current_state = DataStore.get_current_state.__func__
get_current_state_for_key = DataStore.get_current_state_for_key.__func__
get_rooms_for_user_where_membership_is = (
@@ -95,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_room = (
DataStore.get_room_events_stream_for_room.__func__
)
+ get_events_around = DataStore.get_events_around.__func__
+ get_state_for_events = DataStore.get_state_for_events.__func__
+ get_state_groups = DataStore.get_state_groups.__func__
_set_before_and_after = DataStore._set_before_and_after
@@ -104,6 +120,7 @@ class SlavedEventStore(BaseSlavedStore):
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
_parse_events_txn = DataStore._parse_events_txn.__func__
_get_events_txn = DataStore._get_events_txn.__func__
+ _get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
_fetch_events_txn = DataStore._fetch_events_txn.__func__
@@ -114,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
_get_members_rows_txn = DataStore._get_members_rows_txn.__func__
+ _get_state_for_groups = DataStore._get_state_for_groups.__func__
+ _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
+ _get_events_around_txn = DataStore._get_events_around_txn.__func__
+ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
@@ -128,7 +149,7 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("events")
if stream:
- self._stream_id_gen.advance(stream["position"])
+ self._stream_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
self._process_replication_row(
row, backfilled=False, state_resets=state_resets
@@ -136,7 +157,7 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("backfill")
if stream:
- self._backfill_id_gen.advance(-stream["position"])
+ self._backfill_id_gen.advance(-int(stream["position"]))
for row in stream["rows"]:
self._process_replication_row(
row, backfilled=True, state_resets=state_resets
@@ -144,12 +165,14 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("forward_ex_outliers")
if stream:
+ self._stream_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
event_id = row[1]
self._invalidate_get_event_cache(event_id)
stream = result.get("backward_ex_outliers")
if stream:
+ self._backfill_id_gen.advance(-int(stream["position"]))
for row in stream["rows"]:
event_id = row[1]
self._invalidate_get_event_cache(event_id)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 8faddb2595..d88206b3bb 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -43,10 +43,10 @@ class SlavedPusherStore(BaseSlavedStore):
def process_replication(self, result):
stream = result.get("pushers")
if stream:
- self._pushers_id_gen.advance(stream["position"])
+ self._pushers_id_gen.advance(int(stream["position"]))
stream = result.get("deleted_pushers")
if stream:
- self._pushers_id_gen.advance(stream["position"])
+ self._pushers_id_gen.advance(int(stream["position"]))
return super(SlavedPusherStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index b55d5dfd08..ec007516d0 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -50,7 +50,7 @@ class SlavedReceiptsStore(BaseSlavedStore):
def process_replication(self, result):
stream = result.get("receipts")
if stream:
- self._receipts_id_gen.advance(stream["position"])
+ self._receipts_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
room_id, receipt_type, user_id = row[1:4]
self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
|