diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 0d7108f01b..8670ffbfa3 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -378,6 +378,12 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
self._current_positions.values(), default=1
)
+ if not writers:
+ # If there have been no explicit writers given then any instance can
+ # write to the stream. In which case, let's pre-seed our own
+ # position with the current minimum.
+ self._current_positions[self._instance_name] = self._persisted_upto_position
+
def _load_current_ids(
self,
db_conn: LoggingDatabaseConnection,
@@ -695,24 +701,22 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
heapq.heappush(self._known_persisted_positions, new_id)
- # If we're a writer and we don't have any active writes we update our
- # current position to the latest position seen. This allows the instance
- # to report a recent position when asked, rather than a potentially old
- # one (if this instance hasn't written anything for a while).
- our_current_position = self._current_positions.get(self._instance_name)
- if (
- our_current_position
- and not self._unfinished_ids
- and not self._in_flight_fetches
- ):
- self._current_positions[self._instance_name] = max(
- our_current_position, new_id
- )
-
# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
- min_curr = min(self._current_positions.values(), default=0)
+ our_current_position = self._current_positions.get(self._instance_name, 0)
+ min_curr = min(
+ (
+ token
+ for name, token in self._current_positions.items()
+ if name != self._instance_name
+ ),
+ default=our_current_position,
+ )
+
+ if our_current_position and (self._unfinished_ids or self._in_flight_fetches):
+ min_curr = min(min_curr, our_current_position)
+
self._persisted_upto_position = max(min_curr, self._persisted_upto_position)
# We now iterate through the seen positions, discarding those that are
|