diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 18def01f50..788158199c 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -178,6 +178,8 @@ class PersistEventsStore:
)
persist_event_counter.inc(len(events_and_contexts))
+ logger.debug("Finished persisting 1")
+
if not backfilled:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
@@ -185,6 +187,8 @@ class PersistEventsStore:
events_and_contexts[-1][0].internal_metadata.stream_ordering
)
+ logger.debug("Finished persisting 2")
+
for event, context in events_and_contexts:
if context.app_service:
origin_type = "local"
@@ -198,6 +202,8 @@ class PersistEventsStore:
event_counter.labels(event.type, origin_type, origin_entity).inc()
+ logger.debug("Finished persisting 3")
+
for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)
@@ -206,6 +212,9 @@ class PersistEventsStore:
(room_id,), list(latest_event_ids)
)
+ logger.debug("Finished persisting 4")
+ logger.debug("Finished persisting 5")
+
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
"""Filter the supplied list of event_ids to get those which are prev_events of
existing (non-outlier/rejected) events.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4fd7573e26..f09a68e440 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -217,6 +217,7 @@ class MultiWriterIdGenerator:
self._instance_name = instance_name
self._positive = positive
self._writers = writers
+ self._sequence_name = sequence_name
self._return_factor = 1 if positive else -1
# We lock as some functions may be called from DB threads.
@@ -227,6 +228,8 @@ class MultiWriterIdGenerator:
# return them.
self._current_positions = {} # type: Dict[str, int]
+ self._max_persisted_positions = dict(self._current_positions)
+
# Set of local IDs that we're still processing. The current position
# should be less than the minimum of this set (if not empty).
self._unfinished_ids = set() # type: Set[int]
@@ -404,6 +407,12 @@ class MultiWriterIdGenerator:
current position if possible.
"""
+ logger.debug(
+ "Mark as finished 1 _current_positions %s: %s",
+ self._sequence_name,
+ self._current_positions,
+ )
+
with self._lock:
self._unfinished_ids.discard(next_id)
self._finished_ids.add(next_id)
@@ -439,6 +448,16 @@ class MultiWriterIdGenerator:
if new_cur:
curr = self._current_positions.get(self._instance_name, 0)
self._current_positions[self._instance_name] = max(curr, new_cur)
+ self._max_persisted_positions[self._instance_name] = max(
+ self._current_positions[self._instance_name],
+ self._max_persisted_positions.get(self._instance_name, 0),
+ )
+
+ logger.debug(
+ "Mark as finished _current_positions %s: %s",
+ self._sequence_name,
+ self._current_positions,
+ )
self._add_persisted_position(next_id)
@@ -454,6 +473,11 @@ class MultiWriterIdGenerator:
"""
with self._lock:
+ logger.debug(
+ "get_current_token_for_writer %s: %s",
+ self._sequence_name,
+ self._current_positions,
+ )
return self._return_factor * self._current_positions.get(instance_name, 0)
def get_positions(self) -> Dict[str, int]:
@@ -478,6 +502,12 @@ class MultiWriterIdGenerator:
new_id, self._current_positions.get(instance_name, 0)
)
+ self._max_persisted_positions[instance_name] = max(
+ new_id,
+ self._current_positions.get(instance_name, 0),
+ self._max_persisted_positions.get(instance_name, 0),
+ )
+
self._add_persisted_position(new_id)
def get_persisted_upto_position(self) -> int:
@@ -492,10 +522,29 @@ class MultiWriterIdGenerator:
with self._lock:
return self._return_factor * self._persisted_upto_position
+ def get_max_persisted_position_for_self(self) -> int:
+ with self._lock:
+ if self._unfinished_ids:
+ return self.get_current_token_for_writer(self._instance_name)
+
+ return self._return_factor * max(
+ self._current_positions.values(), default=1
+ )
+
+ def advance_persisted_to(self, instance_name: str, new_id: int):
+ new_id *= self._return_factor
+
+ with self._lock:
+ self._max_persisted_positions[instance_name] = max(
+ new_id,
+ self._current_positions.get(instance_name, 0),
+ self._max_persisted_positions.get(instance_name, 0),
+ )
+
def _add_persisted_position(self, new_id: int):
"""Record that we have persisted a position.
- This is used to keep the `_current_positions` up to date.
+ This is used to keep the `_persisted_upto_position` up to date.
"""
# We require that the lock is locked by caller
@@ -506,7 +555,7 @@ class MultiWriterIdGenerator:
# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
- min_curr = min(self._current_positions.values(), default=0)
+ min_curr = min(self._max_persisted_positions.values(), default=0)
self._persisted_upto_position = max(min_curr, self._persisted_upto_position)
# We now iterate through the seen positions, discarding those that are
|