summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7656.bugfix1
-rw-r--r--synapse/replication/slave/storage/account_data.py8
-rw-r--r--synapse/replication/tcp/streams/_base.py10
-rw-r--r--synapse/storage/data_stores/main/account_data.py16
-rw-r--r--synapse/storage/data_stores/main/tags.py3
-rw-r--r--synapse/storage/prepare_database.py1
6 files changed, 35 insertions, 4 deletions
diff --git a/changelog.d/7656.bugfix b/changelog.d/7656.bugfix
new file mode 100644
index 0000000000..1aeddb5fb9
--- /dev/null
+++ b/changelog.d/7656.bugfix
@@ -0,0 +1 @@
+Fix bug in account data replication stream.
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 2a4f5c7cfd..9db6c62bc7 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -24,7 +24,13 @@ from synapse.storage.database import Database
 class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
     def __init__(self, database: Database, db_conn, hs):
         self._account_data_id_gen = SlavedIdTracker(
-            db_conn, "account_data_max_stream_id", "stream_id"
+            db_conn,
+            "account_data",
+            "stream_id",
+            extra_tables=[
+                ("room_account_data", "stream_id"),
+                ("room_tags_revisions", "stream_id"),
+            ],
         )
 
         super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index d42aaff055..4acefc8a96 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -600,8 +600,14 @@ class AccountDataStream(Stream):
             for stream_id, user_id, room_id, account_data_type in room_results
         )
 
-        # we need to return a sorted list, so merge them together.
-        updates = list(heapq.merge(room_rows, global_rows))
+        # We need to return a sorted list, so merge them together.
+        #
+        # Note: We order only by the stream ID to work around a bug where the
+        # same stream ID could appear in both `global_rows` and `room_rows`,
+        # leading to a comparison between the data tuples. The comparison could
+        # fail due to attempting to compare the `room_id` which results in a
+        # `TypeError` from comparing a `str` vs `None`.
+        updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
         return updates, to_token, limited
 
 
diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py
index f9eef1b78e..b58f04d00d 100644
--- a/synapse/storage/data_stores/main/account_data.py
+++ b/synapse/storage/data_stores/main/account_data.py
@@ -297,7 +297,13 @@ class AccountDataWorkerStore(SQLBaseStore):
 class AccountDataStore(AccountDataWorkerStore):
     def __init__(self, database: Database, db_conn, hs):
         self._account_data_id_gen = StreamIdGenerator(
-            db_conn, "account_data_max_stream_id", "stream_id"
+            db_conn,
+            "account_data_max_stream_id",
+            "stream_id",
+            extra_tables=[
+                ("room_account_data", "stream_id"),
+                ("room_tags_revisions", "stream_id"),
+            ],
         )
 
         super(AccountDataStore, self).__init__(database, db_conn, hs)
@@ -387,6 +393,10 @@ class AccountDataStore(AccountDataWorkerStore):
             # doesn't sound any worse than the whole update getting lost,
             # which is what would happen if we combined the two into one
             # transaction.
+            #
+            # Note: This is only here for backwards compat to allow admins to
+            # roll back to a previous Synapse version. Next time we update the
+            # database version we can remove this table.
             yield self._update_max_stream_id(next_id)
 
             self._account_data_stream_cache.entity_has_changed(user_id, next_id)
@@ -405,6 +415,10 @@ class AccountDataStore(AccountDataWorkerStore):
             next_id(int): The the revision to advance to.
         """
 
+        # Note: This is only here for backwards compat to allow admins to
+        # roll back to a previous Synapse version. Next time we update the
+        # database version we can remove this table.
+
         def _update(txn):
             update_max_id_sql = (
                 "UPDATE account_data_max_stream_id"
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 2aa1bafd48..4219018302 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -233,6 +233,9 @@ class TagsStore(TagsWorkerStore):
             self._account_data_stream_cache.entity_has_changed, user_id, next_id
         )
 
+        # Note: This is only here for backwards compat to allow admins to
+        # roll back to a previous Synapse version. Next time we update the
+        # database version we can remove this table.
         update_max_id_sql = (
             "UPDATE account_data_max_stream_id"
             " SET stream_id = ?"
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b95434f031..9cc3b51fe6 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -33,6 +33,7 @@ logger = logging.getLogger(__name__)
 # schema files, so the users will be informed on server restarts.
 # XXX: If you're about to bump this to 59 (or higher) please create an update
 # that drops the unused `cache_invalidation_stream` table, as per #7436!
+# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
 SCHEMA_VERSION = 58
 
 dir_path = os.path.abspath(os.path.dirname(__file__))