diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix
new file mode 100644
index 0000000000..7e98d565a1
--- /dev/null
+++ b/changelog.d/11207.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 67f8ffcaff..ddc9105ee9 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -58,6 +59,10 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
+ self._ephemeral_events_linearizer = Linearizer(
+ name="appservice_ephemeral_events"
+ )
+
def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
@@ -260,26 +265,37 @@ class ApplicationServicesHandler:
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
+ continue
- elif stream_key == "receipt_key":
- events = await self._handle_receipts(service)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
-
- # Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
- service, "read_receipt", new_token
+ # Since we read/update the stream position for this AS/stream
+ with (
+ await self._ephemeral_events_linearizer.queue(
+ (service.id, stream_key)
)
+ ):
+ if stream_key == "receipt_key":
+ events = await self._handle_receipts(service, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(
+ service, events
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_type_stream_id_for_appservice(
+ service, "read_receipt", new_token
+ )
- elif stream_key == "presence_key":
- events = await self._handle_presence(service, users)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
+ elif stream_key == "presence_key":
+ events = await self._handle_presence(service, users, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(
+ service, events
+ )
- # Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
- service, "presence", new_token
- )
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_type_stream_id_for_appservice(
+ service, "presence", new_token
+ )
async def _handle_typing(
self, service: ApplicationService, new_token: int
@@ -316,7 +332,9 @@ class ApplicationServicesHandler:
)
return typing
- async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+ async def _handle_receipts(
+ self, service: ApplicationService, new_token: Optional[int]
+ ) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
@@ -335,6 +353,12 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
+ if new_token is not None and new_token <= from_key:
+ logger.debug(
+ "Rejecting token lower than or equal to stored: %s" % (new_token,)
+ )
+ return []
+
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
@@ -342,7 +366,10 @@ class ApplicationServicesHandler:
return receipts
async def _handle_presence(
- self, service: ApplicationService, users: Collection[Union[str, UserID]]
+ self,
+ service: ApplicationService,
+ users: Collection[Union[str, UserID]],
+ new_token: Optional[int],
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
@@ -365,6 +392,12 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
+ if new_token is not None and new_token <= from_key:
+ logger.debug(
+ "Rejecting token lower than or equal to stored: %s" % (new_token,)
+ )
+ return []
+
for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 43998020b2..1f6a924452 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -40,6 +40,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
hs.get_application_service_scheduler.return_value = self.mock_scheduler
hs.get_clock.return_value = MockClock()
self.handler = ApplicationServicesHandler(hs)
+ self.event_source = hs.get_event_sources()
def test_notify_interested_services(self):
interested_service = self._mkservice(is_interested=True)
@@ -252,6 +253,56 @@ class AppServiceHandlerTestCase(unittest.TestCase):
},
)
+ def test_notify_interested_services_ephemeral(self):
+ """
+ Test sending ephemeral events to the appservice handler are scheduled
+ to be pushed out to interested appservices, and that the stream ID is
+ updated accordingly.
+ """
+ interested_service = self._mkservice(is_interested=True)
+ services = [interested_service]
+
+ self.mock_store.get_app_services.return_value = services
+ self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
+ 579
+ )
+
+ event = Mock(event_id="event_1")
+ self.event_source.sources.receipt.get_new_events_as.return_value = (
+ make_awaitable(([event], None))
+ )
+
+ self.handler.notify_interested_services_ephemeral("receipt_key", 580)
+ self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
+ interested_service, [event]
+ )
+ self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
+ interested_service,
+ "read_receipt",
+ 580,
+ )
+
+ def test_notify_interested_services_ephemeral_out_of_order(self):
+ """
+ Test sending out of order ephemeral events to the appservice handler
+ are ignored.
+ """
+ interested_service = self._mkservice(is_interested=True)
+ services = [interested_service]
+
+ self.mock_store.get_app_services.return_value = services
+ self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
+ 580
+ )
+
+ event = Mock(event_id="event_1")
+ self.event_source.sources.receipt.get_new_events_as.return_value = (
+ make_awaitable(([event], None))
+ )
+
+ self.handler.notify_interested_services_ephemeral("receipt_key", 579)
+ self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
+
def _mkservice(self, is_interested, protocols=None):
service = Mock()
service.is_interested.return_value = make_awaitable(is_interested)
|