diff --git a/changelog.d/10107.bugfix b/changelog.d/10107.bugfix
new file mode 100644
index 0000000000..80030efab2
--- /dev/null
+++ b/changelog.d/10107.bugfix
@@ -0,0 +1 @@
+Fixed a bug that could cause Synapse to stop notifying application services. Contributed by Willem Mulder.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 177310f0be..862638cc4f 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -87,7 +87,8 @@ class ApplicationServicesHandler:
self.is_processing = True
try:
limit = 100
- while True:
+ upper_bound = -1
+ while upper_bound < self.current_max:
(
upper_bound,
events,
@@ -95,9 +96,6 @@ class ApplicationServicesHandler:
self.current_max, limit
)
- if not events:
- break
-
events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -153,9 +151,6 @@ class ApplicationServicesHandler:
await self.store.set_appservice_last_pos(upper_bound)
- now = self.clock.time_msec()
- ts = await self.store.get_received_ts(events[-1].event_id)
-
synapse.metrics.event_processing_positions.labels(
"appservice_sender"
).set(upper_bound)
@@ -168,12 +163,16 @@ class ApplicationServicesHandler:
event_processing_loop_counter.labels("appservice_sender").inc()
- synapse.metrics.event_processing_lag.labels(
- "appservice_sender"
- ).set(now - ts)
- synapse.metrics.event_processing_last_ts.labels(
- "appservice_sender"
- ).set(ts)
+ if events:
+ now = self.clock.time_msec()
+ ts = await self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_lag.labels(
+ "appservice_sender"
+ ).set(now - ts)
+ synapse.metrics.event_processing_last_ts.labels(
+ "appservice_sender"
+ ).set(ts)
finally:
self.is_processing = False
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index b037b12a0f..5d6cc2885f 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -57,10 +57,10 @@ class AppServiceHandlerTestCase(unittest.TestCase):
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
)
self.mock_store.get_new_events_for_appservice.side_effect = [
- make_awaitable((0, [event])),
make_awaitable((0, [])),
+ make_awaitable((1, [event])),
]
- self.handler.notify_interested_services(RoomStreamToken(None, 0))
+ self.handler.notify_interested_services(RoomStreamToken(None, 1))
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
@@ -77,7 +77,6 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
- make_awaitable((0, [])),
]
self.handler.notify_interested_services(RoomStreamToken(None, 0))
@@ -95,7 +94,6 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
- make_awaitable((0, [])),
]
self.handler.notify_interested_services(RoomStreamToken(None, 0))
|