summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/config/repository.py2
-rw-r--r--synapse/replication/http/register.py18
-rw-r--r--synapse/storage/databases/main/event_push_actions.py72
3 files changed, 77 insertions, 15 deletions
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 1033496bb4..e4759711ed 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -205,7 +205,7 @@ class ContentRepositoryConfig(Config):
         )
         self.url_preview_enabled = config.get("url_preview_enabled", False)
         if self.url_preview_enabled:
-            check_requirements("url_preview")
+            check_requirements("url-preview")
 
             proxy_env = getproxies_environment()
             if "url_preview_ip_range_blacklist" not in config:
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 61abb529c8..976c283360 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -39,6 +39,16 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
         self.store = hs.get_datastores().main
         self.registration_handler = hs.get_registration_handler()
 
+        # Default value if the worker that sent the replication request did not include
+        # an 'approved' property.
+        if (
+            hs.config.experimental.msc3866.enabled
+            and hs.config.experimental.msc3866.require_approval_for_new_accounts
+        ):
+            self._approval_default = False
+        else:
+            self._approval_default = True
+
     @staticmethod
     async def _serialize_payload(  # type: ignore[override]
         user_id: str,
@@ -92,6 +102,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
 
         await self.registration_handler.check_registration_ratelimit(content["address"])
 
+        # Always default admin users to approved (since it means they were created by
+        # an admin).
+        approved_default = self._approval_default
+        if content["admin"]:
+            approved_default = True
+
         await self.registration_handler.register_with_store(
             user_id=user_id,
             password_hash=content["password_hash"],
@@ -103,7 +119,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
             user_type=content["user_type"],
             address=content["address"],
             shadow_banned=content["shadow_banned"],
-            approved=content["approved"],
+            approved=content.get("approved", approved_default),
         )
 
         return 200, {}
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 332e13d1c9..f070e6e88a 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -310,11 +310,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         event_push_actions_done = progress.get("event_push_actions_done", False)
 
         def add_thread_id_txn(
-            txn: LoggingTransaction, table_name: str, start_stream_ordering: int
+            txn: LoggingTransaction, start_stream_ordering: int
         ) -> int:
-            sql = f"""
+            sql = """
             SELECT stream_ordering
-            FROM {table_name}
+            FROM event_push_actions
             WHERE
                 thread_id IS NULL
                 AND stream_ordering > ?
@@ -326,7 +326,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             # No more rows to process.
             rows = txn.fetchall()
             if not rows:
-                progress[f"{table_name}_done"] = True
+                progress["event_push_actions_done"] = True
                 self.db_pool.updates._background_update_progress_txn(
                     txn, "event_push_backfill_thread_id", progress
                 )
@@ -335,16 +335,65 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             # Update the thread ID for any of those rows.
             max_stream_ordering = rows[-1][0]
 
-            sql = f"""
-            UPDATE {table_name}
+            sql = """
+            UPDATE event_push_actions
             SET thread_id = 'main'
-            WHERE stream_ordering <= ? AND thread_id IS NULL
+            WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
             """
-            txn.execute(sql, (max_stream_ordering,))
+            txn.execute(
+                sql,
+                (
+                    start_stream_ordering,
+                    max_stream_ordering,
+                ),
+            )
 
             # Update progress.
             processed_rows = txn.rowcount
-            progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
+            progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "event_push_backfill_thread_id", progress
+            )
+
+            return processed_rows
+
+        def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
+            min_user_id = progress.get("max_summary_user_id", "")
+            min_room_id = progress.get("max_summary_room_id", "")
+
+            # Slightly overcomplicated query for getting the Nth user ID / room
+            # ID tuple, or the last if there are less than N remaining.
+            sql = """
+            SELECT user_id, room_id FROM (
+                SELECT user_id, room_id FROM event_push_summary
+                WHERE (user_id, room_id) > (?, ?)
+                    AND thread_id IS NULL
+                ORDER BY user_id, room_id
+                LIMIT ?
+            ) AS e
+            ORDER BY user_id DESC, room_id DESC
+            LIMIT 1
+            """
+
+            txn.execute(sql, (min_user_id, min_room_id, batch_size))
+            row = txn.fetchone()
+            if not row:
+                return 0
+
+            max_user_id, max_room_id = row
+
+            sql = """
+            UPDATE event_push_summary
+            SET thread_id = 'main'
+            WHERE
+                (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
+                AND thread_id IS NULL
+            """
+            txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
+            processed_rows = txn.rowcount
+
+            progress["max_summary_user_id"] = max_user_id
+            progress["max_summary_room_id"] = max_room_id
             self.db_pool.updates._background_update_progress_txn(
                 txn, "event_push_backfill_thread_id", progress
             )
@@ -360,15 +409,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             result = await self.db_pool.runInteraction(
                 "event_push_backfill_thread_id",
                 add_thread_id_txn,
-                "event_push_actions",
                 progress.get("max_event_push_actions_stream_ordering", 0),
             )
         else:
             result = await self.db_pool.runInteraction(
                 "event_push_backfill_thread_id",
-                add_thread_id_txn,
-                "event_push_summary",
-                progress.get("max_event_push_summary_stream_ordering", 0),
+                add_thread_id_summary_txn,
             )
 
             # Only done after the event_push_summary table is done.