Implement cache replication stream
1 files changed, 0 insertions, 13 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3dca1c37a0..207a75d89e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -338,16 +338,10 @@ class SynchrotronServer(HomeServer):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
- clock = self.get_clock()
notifier = self.get_notifier()
presence_handler = self.get_presence_handler()
typing_handler = self.get_typing_handler()
- def expire_broken_caches():
- store.who_forgot_in_room.invalidate_all()
- store.get_presence_list_accepted.invalidate_all()
- store.get_presence_list_observers_accepted.invalidate_all()
-
def notify_from_stream(
result, stream_name, stream_key, room=None, user=None
):
@@ -409,19 +403,12 @@ class SynchrotronServer(HomeServer):
result, "typing", "typing_key", room="room_id"
)
- next_expire_broken_caches_ms = 0
while True:
try:
args = store.stream_positions()
args.update(typing_handler.stream_positions())
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
- now_ms = clock.time_msec()
- if now_ms > next_expire_broken_caches_ms:
- expire_broken_caches()
- next_expire_broken_caches_ms = (
- now_ms + store.BROKEN_CACHE_EXPIRY_MS
- )
yield store.process_replication(result)
typing_handler.process_replication(result)
yield presence_handler.process_replication(result)
|