summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/account_data.py8
-rw-r--r--synapse/replication/tcp/streams/_base.py10
2 files changed, 15 insertions, 3 deletions
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 2a4f5c7cfd..9db6c62bc7 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -24,7 +24,13 @@ from synapse.storage.database import Database
 class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
     def __init__(self, database: Database, db_conn, hs):
         self._account_data_id_gen = SlavedIdTracker(
-            db_conn, "account_data_max_stream_id", "stream_id"
+            db_conn,
+            "account_data",
+            "stream_id",
+            extra_tables=[
+                ("room_account_data", "stream_id"),
+                ("room_tags_revisions", "stream_id"),
+            ],
         )
 
         super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index d42aaff055..4acefc8a96 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -600,8 +600,14 @@ class AccountDataStream(Stream):
             for stream_id, user_id, room_id, account_data_type in room_results
         )
 
-        # we need to return a sorted list, so merge them together.
-        updates = list(heapq.merge(room_rows, global_rows))
+        # We need to return a sorted list, so merge them together.
+        #
+        # Note: We order only by the stream ID to work around a bug where the
+        # same stream ID could appear in both `global_rows` and `room_rows`,
+        # leading to a comparison between the data tuples. The comparison could
+        # fail due to attempting to compare the `room_id` which results in a
+        # `TypeError` from comparing a `str` vs `None`.
+        updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
         return updates, to_token, limited