1 files changed, 7 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 273adb61fd..324bd5f879 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -270,6 +270,10 @@ class DeviceWorkerStore(SQLBaseStore):
# The most recent request's opentracing_context is used as the
# context which created the Edu.
+ # This is the stream ID that we will return for the consumer to resume
+ # following this stream later.
+ last_processed_stream_id = from_stream_id
+
query_map = {}
cross_signing_keys_by_user = {}
for user_id, device_id, update_stream_id, update_context in updates:
@@ -295,6 +299,8 @@ class DeviceWorkerStore(SQLBaseStore):
if update_stream_id > previous_update_stream_id:
query_map[key] = (update_stream_id, update_context)
+ last_processed_stream_id = update_stream_id
+
results = await self._get_device_update_edus_by_remote(
destination, from_stream_id, query_map
)
@@ -307,7 +313,7 @@ class DeviceWorkerStore(SQLBaseStore):
# FIXME: remove this when enough servers have upgraded
results.append(("org.matrix.signing_key_update", result))
- return now_stream_id, results
+ return last_processed_stream_id, results
def _get_device_updates_by_remote_txn(
self,
|