diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f3ec2a34ec..53c488d211 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -738,6 +738,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
except Exception:
logger.exception("Error processing replication")
+ async def on_position(self, stream_name: str, instance_name: str, token: int):
+ await super().on_position(stream_name, instance_name, token)
+ # Also call on_rdata to ensure that stream positions are properly reset.
+ await self.on_rdata(stream_name, instance_name, token, [])
+
def stop_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8454d74858..93bc45208e 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -24,8 +24,6 @@ import os
import resource
import sys
-from six import iteritems
-
from prometheus_client import Gauge
from twisted.application import service
@@ -525,7 +523,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
- for name, count in iteritems(daily_user_type_results):
+ for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@@ -537,7 +535,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
- for name, count in iteritems(r30_results):
+ for name, count in r30_results.items():
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
|