diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 48bc97636c..e3173533e2 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -26,6 +26,7 @@ from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.rest.client.v2_alpha import sync
+from synapse.rest.client.v1 import events
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
@@ -74,11 +75,6 @@ class SynchrotronSlavedStore(
BaseSlavedStore,
ClientIpStore, # After BaseSlavedStore because the constructor is different
):
- # XXX: This is a bit broken because we don't persist forgotten rooms
- # in a way that they can be streamed. This means that we don't have a
- # way to invalidate the forgotten rooms cache correctly.
- # For now we expire the cache every 10 minutes.
- BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
)
@@ -89,17 +85,23 @@ class SynchrotronSlavedStore(
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
+ get_presence_list_observers_accepted = PresenceStore.__dict__[
+ "get_presence_list_observers_accepted"
+ ]
+
UPDATE_SYNCING_USERS_MS = 10 * 1000
class SynchrotronPresence(object):
def __init__(self, hs):
+ self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {
@@ -124,6 +126,8 @@ class SynchrotronPresence(object):
pass
get_states = PresenceHandler.get_states.__func__
+ get_state = PresenceHandler.get_state.__func__
+ _get_interested_parties = PresenceHandler._get_interested_parties.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__
@defer.inlineCallbacks
@@ -194,19 +198,39 @@ class SynchrotronPresence(object):
self._need_to_send_sync = False
yield self._send_syncing_users_now()
+ @defer.inlineCallbacks
+ def notify_from_replication(self, states, stream_id):
+ parties = yield self._get_interested_parties(
+ states, calculate_remote_hosts=False
+ )
+ room_ids_to_states, users_to_states, _ = parties
+
+ self.notifier.on_new_event(
+ "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+ users=users_to_states.keys()
+ )
+
+ @defer.inlineCallbacks
def process_replication(self, result):
stream = result.get("presence", {"rows": []})
+ states = []
for row in stream["rows"]:
(
position, user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts, status_msg,
currently_active
) = row
- self.user_to_current_state[user_id] = UserPresenceState(
+ state = UserPresenceState(
user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts, status_msg,
currently_active
)
+ self.user_to_current_state[user_id] = state
+ states.append(state)
+
+ if states and "position" in stream:
+ stream_id = int(stream["position"])
+ yield self.notify_from_replication(states, stream_id)
class SynchrotronTyping(object):
@@ -266,10 +290,12 @@ class SynchrotronServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource)
+ events.register_servlets(self, resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
})
root_resource = create_resource_tree(resources, Resource())
@@ -307,15 +333,10 @@ class SynchrotronServer(HomeServer):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
- clock = self.get_clock()
notifier = self.get_notifier()
presence_handler = self.get_presence_handler()
typing_handler = self.get_typing_handler()
- def expire_broken_caches():
- store.who_forgot_in_room.invalidate_all()
- store.get_presence_list_accepted.invalidate_all()
-
def notify_from_stream(
result, stream_name, stream_key, room=None, user=None
):
@@ -377,22 +398,15 @@ class SynchrotronServer(HomeServer):
result, "typing", "typing_key", room="room_id"
)
- next_expire_broken_caches_ms = 0
while True:
try:
args = store.stream_positions()
args.update(typing_handler.stream_positions())
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
- now_ms = clock.time_msec()
- if now_ms > next_expire_broken_caches_ms:
- expire_broken_caches()
- next_expire_broken_caches_ms = (
- now_ms + store.BROKEN_CACHE_EXPIRY_MS
- )
yield store.process_replication(result)
typing_handler.process_replication(result)
- presence_handler.process_replication(result)
+ yield presence_handler.process_replication(result)
notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
|