diff options
author | Sean Quah <8349537+squahtx@users.noreply.github.com> | 2021-11-04 10:33:53 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-04 10:33:53 +0000 |
commit | 8eec25a1d9d656905db18a2c62a5552e63db2667 (patch) | |
tree | 46f8ad9f72355ea3947cb3bb93811ee708af5245 /synapse/storage/databases | |
parent | Add a linearizer on (appservice, stream) when handling ephemeral events. (#11... (diff) | |
download | synapse-8eec25a1d9d656905db18a2c62a5552e63db2667.tar.xz |
Track ongoing event fetches correctly in the presence of failure (#11240)
When an event fetcher aborts due to an exception, `_event_fetch_ongoing` must be decremented, otherwise the event fetcher would never be replaced. If enough event fetchers were to fail, no more events would be fetched and requests would get stuck waiting for events.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 56 |
1 files changed, 34 insertions, 22 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ae37901be9..c6bf316d5b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -28,6 +28,7 @@ from typing import ( import attr from constantly import NamedConstant, Names +from prometheus_client import Gauge from typing_extensions import Literal from twisted.internet import defer @@ -81,6 +82,12 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +event_fetch_ongoing_gauge = Gauge( + "synapse_event_fetch_ongoing", + "The number of event fetchers that are running", +) + + @attr.s(slots=True, auto_attribs=True) class _EventCacheEntry: event: EventBase @@ -222,6 +229,7 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) # We define this sequence here so that it can be referenced from both # the DataStore and PersistEventStore. @@ -732,28 +740,31 @@ class EventsWorkerStore(SQLBaseStore): """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - i = 0 - while True: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if ( - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING - or single_threaded - or i > EVENT_QUEUE_ITERATIONS - ): - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - self._fetch_event_list(conn, event_list) + try: + i = 0 + while True: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if ( + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING + or single_threaded + or i > EVENT_QUEUE_ITERATIONS + ): + break + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + self._fetch_event_list(conn, event_list) + finally: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) def _fetch_event_list( self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] @@ -977,6 +988,7 @@ class EventsWorkerStore(SQLBaseStore): if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) should_start = True else: should_start = False |