summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py9
-rw-r--r--synapse/storage/util/id_generators.py53
2 files changed, 60 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py

index 18def01f50..788158199c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -178,6 +178,8 @@ class PersistEventsStore: ) persist_event_counter.inc(len(events_and_contexts)) + logger.debug("Finished persisting 1") + if not backfilled: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. @@ -185,6 +187,8 @@ class PersistEventsStore: events_and_contexts[-1][0].internal_metadata.stream_ordering ) + logger.debug("Finished persisting 2") + for event, context in events_and_contexts: if context.app_service: origin_type = "local" @@ -198,6 +202,8 @@ class PersistEventsStore: event_counter.labels(event.type, origin_type, origin_entity).inc() + logger.debug("Finished persisting 3") + for room_id, new_state in current_state_for_room.items(): self.store.get_current_state_ids.prefill((room_id,), new_state) @@ -206,6 +212,9 @@ class PersistEventsStore: (room_id,), list(latest_event_ids) ) + logger.debug("Finished persisting 4") + logger.debug("Finished persisting 5") + async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]: """Filter the supplied list of event_ids to get those which are prev_events of existing (non-outlier/rejected) events. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4fd7573e26..f09a68e440 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -217,6 +217,7 @@ class MultiWriterIdGenerator: self._instance_name = instance_name self._positive = positive self._writers = writers + self._sequence_name = sequence_name self._return_factor = 1 if positive else -1 # We lock as some functions may be called from DB threads. @@ -227,6 +228,8 @@ class MultiWriterIdGenerator: # return them. self._current_positions = {} # type: Dict[str, int] + self._max_persisted_positions = dict(self._current_positions) + # Set of local IDs that we're still processing. The current position # should be less than the minimum of this set (if not empty). self._unfinished_ids = set() # type: Set[int] @@ -404,6 +407,12 @@ class MultiWriterIdGenerator: current position if possible. """ + logger.debug( + "Mark as finished 1 _current_positions %s: %s", + self._sequence_name, + self._current_positions, + ) + with self._lock: self._unfinished_ids.discard(next_id) self._finished_ids.add(next_id) @@ -439,6 +448,16 @@ class MultiWriterIdGenerator: if new_cur: curr = self._current_positions.get(self._instance_name, 0) self._current_positions[self._instance_name] = max(curr, new_cur) + self._max_persisted_positions[self._instance_name] = max( + self._current_positions[self._instance_name], + self._max_persisted_positions.get(self._instance_name, 0), + ) + + logger.debug( + "Mark as finished _current_positions %s: %s", + self._sequence_name, + self._current_positions, + ) self._add_persisted_position(next_id) @@ -454,6 +473,11 @@ class MultiWriterIdGenerator: """ with self._lock: + logger.debug( + "get_current_token_for_writer %s: %s", + self._sequence_name, + self._current_positions, + ) return self._return_factor * self._current_positions.get(instance_name, 0) def get_positions(self) -> Dict[str, int]: @@ -478,6 +502,12 @@ class MultiWriterIdGenerator: new_id, self._current_positions.get(instance_name, 0) ) + self._max_persisted_positions[instance_name] = max( + new_id, + self._current_positions.get(instance_name, 0), + self._max_persisted_positions.get(instance_name, 0), + ) + self._add_persisted_position(new_id) def get_persisted_upto_position(self) -> int: @@ -492,10 +522,29 @@ class MultiWriterIdGenerator: with self._lock: return self._return_factor * self._persisted_upto_position + def get_max_persisted_position_for_self(self) -> int: + with self._lock: + if self._unfinished_ids: + return self.get_current_token_for_writer(self._instance_name) + + return self._return_factor * max( + self._current_positions.values(), default=1 + ) + + def advance_persisted_to(self, instance_name: str, new_id: int): + new_id *= self._return_factor + + with self._lock: + self._max_persisted_positions[instance_name] = max( + new_id, + self._current_positions.get(instance_name, 0), + self._max_persisted_positions.get(instance_name, 0), + ) + def _add_persisted_position(self, new_id: int): """Record that we have persisted a position. - This is used to keep the `_current_positions` up to date. + This is used to keep the `_persisted_upto_position` up to date. """ # We require that the lock is locked by caller @@ -506,7 +555,7 @@ class MultiWriterIdGenerator: # We move the current min position up if the minimum current positions # of all instances is higher (since by definition all positions less # that that have been persisted). - min_curr = min(self._current_positions.values(), default=0) + min_curr = min(self._max_persisted_positions.values(), default=0) self._persisted_upto_position = max(min_curr, self._persisted_upto_position) # We now iterate through the seen positions, discarding those that are