summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/util/id_generators.py53
1 files changed, 51 insertions, 2 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py

index 4fd7573e26..f09a68e440 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -217,6 +217,7 @@ class MultiWriterIdGenerator: self._instance_name = instance_name self._positive = positive self._writers = writers + self._sequence_name = sequence_name self._return_factor = 1 if positive else -1 # We lock as some functions may be called from DB threads. @@ -227,6 +228,8 @@ class MultiWriterIdGenerator: # return them. self._current_positions = {} # type: Dict[str, int] + self._max_persisted_positions = dict(self._current_positions) + # Set of local IDs that we're still processing. The current position # should be less than the minimum of this set (if not empty). self._unfinished_ids = set() # type: Set[int] @@ -404,6 +407,12 @@ class MultiWriterIdGenerator: current position if possible. """ + logger.debug( + "Mark as finished 1 _current_positions %s: %s", + self._sequence_name, + self._current_positions, + ) + with self._lock: self._unfinished_ids.discard(next_id) self._finished_ids.add(next_id) @@ -439,6 +448,16 @@ class MultiWriterIdGenerator: if new_cur: curr = self._current_positions.get(self._instance_name, 0) self._current_positions[self._instance_name] = max(curr, new_cur) + self._max_persisted_positions[self._instance_name] = max( + self._current_positions[self._instance_name], + self._max_persisted_positions.get(self._instance_name, 0), + ) + + logger.debug( + "Mark as finished _current_positions %s: %s", + self._sequence_name, + self._current_positions, + ) self._add_persisted_position(next_id) @@ -454,6 +473,11 @@ class MultiWriterIdGenerator: """ with self._lock: + logger.debug( + "get_current_token_for_writer %s: %s", + self._sequence_name, + self._current_positions, + ) return self._return_factor * self._current_positions.get(instance_name, 0) def get_positions(self) -> Dict[str, int]: @@ -478,6 +502,12 @@ class MultiWriterIdGenerator: new_id, self._current_positions.get(instance_name, 0) ) + self._max_persisted_positions[instance_name] = max( + new_id, + self._current_positions.get(instance_name, 0), + self._max_persisted_positions.get(instance_name, 0), + ) + self._add_persisted_position(new_id) def get_persisted_upto_position(self) -> int: @@ -492,10 +522,29 @@ class MultiWriterIdGenerator: with self._lock: return self._return_factor * self._persisted_upto_position + def get_max_persisted_position_for_self(self) -> int: + with self._lock: + if self._unfinished_ids: + return self.get_current_token_for_writer(self._instance_name) + + return self._return_factor * max( + self._current_positions.values(), default=1 + ) + + def advance_persisted_to(self, instance_name: str, new_id: int): + new_id *= self._return_factor + + with self._lock: + self._max_persisted_positions[instance_name] = max( + new_id, + self._current_positions.get(instance_name, 0), + self._max_persisted_positions.get(instance_name, 0), + ) + def _add_persisted_position(self, new_id: int): """Record that we have persisted a position. - This is used to keep the `_current_positions` up to date. + This is used to keep the `_persisted_upto_position` up to date. """ # We require that the lock is locked by caller @@ -506,7 +555,7 @@ class MultiWriterIdGenerator: # We move the current min position up if the minimum current positions # of all instances is higher (since by definition all positions less # that that have been persisted). - min_curr = min(self._current_positions.values(), default=0) + min_curr = min(self._max_persisted_positions.values(), default=0) self._persisted_upto_position = max(min_curr, self._persisted_upto_position) # We now iterate through the seen positions, discarding those that are