summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2024-02-20 14:31:44 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2024-02-20 14:31:44 +0000
commitbeff0a756ed5dd52583ca38adfb7b5049f25cba5 (patch)
treeddf89fc257cd59c71a63c9f4e5b848a6739ba016 /synapse
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentDon't lock up when joining large rooms (#16903) (diff)
downloadsynapse-beff0a756ed5dd52583ca38adfb7b5049f25cba5.tar.xz
Merge branch 'develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation_event.py26
-rw-r--r--synapse/push/mailer.py23
-rw-r--r--synapse/storage/databases/main/receipts.py21
3 files changed, 58 insertions, 12 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index bde45308d4..83f6a25981 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1757,17 +1757,25 @@ class FederationEventHandler:
 
             events_and_contexts_to_persist.append((event, context))
 
-        for event in sorted_auth_events:
+        for i, event in enumerate(sorted_auth_events):
             await prep(event)
 
-        await self.persist_events_and_notify(
-            room_id,
-            events_and_contexts_to_persist,
-            # Mark these events backfilled as they're historic events that will
-            # eventually be backfilled. For example, missing events we fetch
-            # during backfill should be marked as backfilled as well.
-            backfilled=True,
-        )
+            # The above function is typically not async, and so won't yield to
+            # the reactor. For large rooms let's yield to the reactor
+            # occasionally to ensure we don't block other work.
+            if (i + 1) % 1000 == 0:
+                await self._clock.sleep(0)
+
+        # Also persist the new event in batches for similar reasons as above.
+        for batch in batch_iter(events_and_contexts_to_persist, 1000):
+            await self.persist_events_and_notify(
+                room_id,
+                batch,
+                # Mark these events as backfilled as they're historic events that will
+                # eventually be backfilled. For example, missing events we fetch
+                # during backfill should be marked as backfilled as well.
+                backfilled=True,
+            )
 
     @trace
     async def _check_event_auth(
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 60161e86f2..b4bd88f308 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, TypeVar
 import bleach
 import jinja2
 from markupsafe import Markup
+from prometheus_client import Counter
 
 from synapse.api.constants import EventTypes, Membership, RoomTypes
 from synapse.api.errors import StoreError
@@ -56,6 +57,12 @@ logger = logging.getLogger(__name__)
 
 T = TypeVar("T")
 
+emails_sent_counter = Counter(
+    "synapse_emails_sent_total",
+    "Emails sent by type",
+    ["type"],
+)
+
 
 CONTEXT_BEFORE = 1
 CONTEXT_AFTER = 1
@@ -130,6 +137,8 @@ class Mailer:
 
         logger.info("Created Mailer for app_name %s" % app_name)
 
+    emails_sent_counter.labels("password_reset")
+
     async def send_password_reset_mail(
         self, email_address: str, token: str, client_secret: str, sid: str
     ) -> None:
@@ -153,6 +162,8 @@ class Mailer:
 
         template_vars: TemplateVars = {"link": link}
 
+        emails_sent_counter.labels("password_reset").inc()
+
         await self.send_email(
             email_address,
             self.email_subjects.password_reset
@@ -160,6 +171,8 @@ class Mailer:
             template_vars,
         )
 
+    emails_sent_counter.labels("registration")
+
     async def send_registration_mail(
         self, email_address: str, token: str, client_secret: str, sid: str
     ) -> None:
@@ -183,6 +196,8 @@ class Mailer:
 
         template_vars: TemplateVars = {"link": link}
 
+        emails_sent_counter.labels("registration").inc()
+
         await self.send_email(
             email_address,
             self.email_subjects.email_validation
@@ -190,6 +205,8 @@ class Mailer:
             template_vars,
         )
 
+    emails_sent_counter.labels("add_threepid")
+
     async def send_add_threepid_mail(
         self, email_address: str, token: str, client_secret: str, sid: str
     ) -> None:
@@ -214,6 +231,8 @@ class Mailer:
 
         template_vars: TemplateVars = {"link": link}
 
+        emails_sent_counter.labels("add_threepid").inc()
+
         await self.send_email(
             email_address,
             self.email_subjects.email_validation
@@ -221,6 +240,8 @@ class Mailer:
             template_vars,
         )
 
+    emails_sent_counter.labels("notification")
+
     async def send_notification_mail(
         self,
         app_id: str,
@@ -315,6 +336,8 @@ class Mailer:
             "reason": reason,
         }
 
+        emails_sent_counter.labels("notification").inc()
+
         await self.send_email(
             email_address, summary_text, template_vars, unsubscribe_link
         )
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3c7708f5f3..8a426d2875 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -472,9 +472,24 @@ class ReceiptsWorkerStore(SQLBaseStore):
             event_entry = room_event["content"].setdefault(event_id, {})
             receipt_type_dict = event_entry.setdefault(receipt_type, {})
 
-            receipt_type_dict[user_id] = db_to_json(data)
-            if thread_id:
-                receipt_type_dict[user_id]["thread_id"] = thread_id
+            # MSC4102: always replace threaded receipts with unthreaded ones if there is a clash.
+            # Specifically:
+            # - if there is no existing receipt, great, set the data.
+            # - if there is an existing receipt, is it threaded (thread_id present)?
+            #    YES: replace if this receipt has no thread id. NO: do not replace.
+            # This means we will drop some receipts, but MSC4102 is designed to drop semantically
+            # meaningless receipts, so this is okay. Previously, we would drop meaningful data!
+            receipt_data = db_to_json(data)
+            if user_id in receipt_type_dict:  # existing receipt
+                # is the existing receipt threaded and we are currently processing an unthreaded one?
+                if "thread_id" in receipt_type_dict[user_id] and not thread_id:
+                    receipt_type_dict[
+                        user_id
+                    ] = receipt_data  # replace with unthreaded one
+            else:  # receipt does not exist, just set it
+                receipt_type_dict[user_id] = receipt_data
+                if thread_id:
+                    receipt_type_dict[user_id]["thread_id"] = thread_id
 
         results = {
             room_id: [results[room_id]] if room_id in results else []