diff options
-rw-r--r-- | changelog.d/13585.bugfix | 1 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 13 |
2 files changed, 12 insertions, 2 deletions
diff --git a/changelog.d/13585.bugfix b/changelog.d/13585.bugfix new file mode 100644 index 0000000000..664b986c59 --- /dev/null +++ b/changelog.d/13585.bugfix @@ -0,0 +1 @@ +Fix loading the current stream position behind the actual position. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 3c13859faa..2dfe4c0b66 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -460,8 +460,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # Cast safety: this corresponds to the types returned by the query above. rows.extend(cast(Iterable[Tuple[str, int]], cur)) - # Sort so that we handle rows in order for each instance. - rows.sort() + # Sort by stream_id (ascending, lowest -> highest) so that we handle + # rows in order for each instance because we don't want to overwrite + # the current_position of an instance to a lower stream ID than + # we're actually at. + def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int: + (instance, stream_id) = row + # If `stream_id` is ever `None`, we will see a `TypeError: '<' + # not supported between instances of 'NoneType' and 'X'` error. + return stream_id + + rows.sort(key=sort_by_stream_id_key_func) with self._lock: for ( |