diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 34e34e5580..c9c48f324f 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,7 +16,7 @@
import synapse
-from synapse.api.constants import EventTypes, PresenceState
+from synapse.api.constants import EventTypes
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -40,15 +40,14 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
-from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
- PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -111,7 +110,6 @@ class SynchrotronPresence(object):
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
- self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@@ -124,14 +122,7 @@ class SynchrotronPresence(object):
self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
- self._sending_sync = False
- self._need_to_send_sync = False
- self.clock.looping_call(
- self._send_syncing_users_regularly,
- UPDATE_SYNCING_USERS_MS,
- )
-
- reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+ self.sync_callback = None
def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
@@ -142,15 +133,15 @@ class SynchrotronPresence(object):
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__
- @defer.inlineCallbacks
def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
- prev_states = yield self.current_state_for_users([user_id])
- if prev_states[user_id].state == PresenceState.OFFLINE:
- # TODO: Don't block the sync request on this HTTP hit.
- yield self._send_syncing_users_now()
+
+ if self.sync_callback:
+ if self.user_to_num_current_syncs[user_id] == 1:
+ now = self.clock.time_msec()
+ self.sync_callback(user_id, True, now)
def _end():
# We check that the user_id is in user_to_num_current_syncs because
@@ -159,6 +150,11 @@ class SynchrotronPresence(object):
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
+ if self.sync_callback:
+ if self.user_to_num_current_syncs[user_id] == 0:
+ now = self.clock.time_msec()
+ self.sync_callback(user_id, False, now)
+
@contextlib.contextmanager
def _user_syncing():
try:
@@ -166,49 +162,7 @@ class SynchrotronPresence(object):
finally:
_end()
- defer.returnValue(_user_syncing())
-
- @defer.inlineCallbacks
- def _on_shutdown(self):
- # When the synchrotron is shutdown tell the master to clear the in
- # progress syncs for this process
- self.user_to_num_current_syncs.clear()
- yield self._send_syncing_users_now()
-
- def _send_syncing_users_regularly(self):
- # Only send an update if we aren't in the middle of sending one.
- if not self._sending_sync:
- preserve_fn(self._send_syncing_users_now)()
-
- @defer.inlineCallbacks
- def _send_syncing_users_now(self):
- if self._sending_sync:
- # We don't want to race with sending another update.
- # Instead we wait for that update to finish and send another
- # update afterwards.
- self._need_to_send_sync = True
- return
-
- # Flag that we are sending an update.
- self._sending_sync = True
-
- yield self.http_client.post_json_get_json(self.syncing_users_url, {
- "process_id": self.process_id,
- "syncing_users": [
- user_id for user_id, count in self.user_to_num_current_syncs.items()
- if count > 0
- ],
- })
-
- # Unset the flag as we are no longer sending an update.
- self._sending_sync = False
- if self._need_to_send_sync:
- # If something happened while we were sending the update then
- # we might need to send another update.
- # TODO: Check if the update that was sent matches the current state
- # as we only need to send an update if they are different.
- self._need_to_send_sync = False
- yield self._send_syncing_users_now()
+ return defer.succeed(_user_syncing())
@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
@@ -223,26 +177,24 @@ class SynchrotronPresence(object):
)
@defer.inlineCallbacks
- def process_replication(self, result):
- stream = result.get("presence", {"rows": []})
- states = []
- for row in stream["rows"]:
- (
- position, user_id, state, last_active_ts,
- last_federation_update_ts, last_user_sync_ts, status_msg,
- currently_active
- ) = row
- state = UserPresenceState(
- user_id, state, last_active_ts,
- last_federation_update_ts, last_user_sync_ts, status_msg,
- currently_active
- )
- self.user_to_current_state[user_id] = state
- states.append(state)
+ def process_replication_rows(self, token, rows):
+ states = [UserPresenceState(
+ row.user_id, row.state, row.last_active_ts,
+ row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
+ row.currently_active
+ ) for row in rows]
- if states and "position" in stream:
- stream_id = int(stream["position"])
- yield self.notify_from_replication(states, stream_id)
+ for state in states:
+ self.user_to_current_state[row.user_id] = state
+
+ stream_id = token
+ yield self.notify_from_replication(states, stream_id)
+
+ def get_currently_syncing_users(self):
+ return [
+ user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+ if count > 0
+ ]
class SynchrotronTyping(object):
@@ -257,16 +209,13 @@ class SynchrotronTyping(object):
# value which we *must* use for the next replication request.
return {"typing": self._latest_room_serial}
- def process_replication(self, result):
- stream = result.get("typing")
- if stream:
- self._latest_room_serial = int(stream["position"])
+ def process_replication_rows(self, token, rows):
+ self._latest_room_serial = token
- for row in stream["rows"]:
- position, room_id, typing_json = row
- typing = json.loads(typing_json)
- self._room_serials[room_id] = position
- self._room_typing[room_id] = typing
+ for row in rows:
+ typing = json.loads(row.user_ids)
+ self._room_serials[row.room_id] = token
+ self._room_typing[row.room_id] = typing
class SynchrotronApplicationService(object):
@@ -351,124 +300,90 @@ class SynchrotronServer(HomeServer):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
- @defer.inlineCallbacks
- def replicate(self):
- http_client = self.get_simple_http_client()
- store = self.get_datastore()
- replication_url = self.config.worker_replication_url
- notifier = self.get_notifier()
- presence_handler = self.get_presence_handler()
- typing_handler = self.get_typing_handler()
-
- def notify_from_stream(
- result, stream_name, stream_key, room=None, user=None
- ):
- stream = result.get(stream_name)
- if stream:
- position_index = stream["field_names"].index("position")
- if room:
- room_index = stream["field_names"].index(room)
- if user:
- user_index = stream["field_names"].index(user)
-
- users = ()
- rooms = ()
- for row in stream["rows"]:
- position = row[position_index]
-
- if user:
- users = (row[user_index],)
-
- if room:
- rooms = (row[room_index],)
-
- notifier.on_new_event(
- stream_key, position, users=users, rooms=rooms
- )
+ self.get_tcp_replication().start_replication(self)
- @defer.inlineCallbacks
- def notify_device_list_update(result):
- stream = result.get("device_lists")
- if not stream:
- return
+ def build_tcp_replication(self):
+ return SyncReplicationHandler(self)
- position_index = stream["field_names"].index("position")
- user_index = stream["field_names"].index("user_id")
+ def build_presence_handler(self):
+ return SynchrotronPresence(self)
- for row in stream["rows"]:
- position = row[position_index]
- user_id = row[user_index]
+ def build_typing_handler(self):
+ return SynchrotronTyping(self)
- room_ids = yield store.get_rooms_for_user(user_id)
- notifier.on_new_event(
- "device_list_key", position, rooms=room_ids,
- )
+class SyncReplicationHandler(ReplicationClientHandler):
+ def __init__(self, hs):
+ super(SyncReplicationHandler, self).__init__(hs.get_datastore())
- @defer.inlineCallbacks
- def notify(result):
- stream = result.get("events")
- if stream:
- max_position = stream["position"]
-
- event_map = yield store.get_events([row[1] for row in stream["rows"]])
-
- for row in stream["rows"]:
- position = row[0]
- event_id = row[1]
- event = event_map.get(event_id, None)
- if not event:
- continue
-
- extra_users = ()
- if event.type == EventTypes.Member:
- extra_users = (event.state_key,)
- notifier.on_new_room_event(
- event, position, max_position, extra_users
- )
+ self.store = hs.get_datastore()
+ self.typing_handler = hs.get_typing_handler()
+ self.presence_handler = hs.get_presence_handler()
+ self.notifier = hs.get_notifier()
- notify_from_stream(
- result, "push_rules", "push_rules_key", user="user_id"
- )
- notify_from_stream(
- result, "user_account_data", "account_data_key", user="user_id"
- )
- notify_from_stream(
- result, "room_account_data", "account_data_key", user="user_id"
+ self.presence_handler.sync_callback = self.send_user_sync
+
+ def on_rdata(self, stream_name, token, rows):
+ super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+ if stream_name == "typing":
+ self.typing_handler.process_replication_rows(token, rows)
+ elif stream_name == "presence":
+ self.presence_handler.process_replication_rows(token, rows)
+ self.notify(stream_name, token, rows)
+
+ def get_streams_to_replicate(self):
+ args = super(SyncReplicationHandler, self).get_streams_to_replicate()
+ args.update(self.typing_handler.stream_positions())
+ return args
+
+ def get_currently_syncing_users(self):
+ return self.presence_handler.get_currently_syncing_users()
+
+ @defer.inlineCallbacks
+ def notify(self, stream_name, token, rows):
+ if stream_name == "events":
+ # We shouldn't get multiple rows per token for events stream, so
+ # we don't need to optimise this for multiple rows.
+ for row in rows:
+ event = yield self.store.get_event(row.event_id)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ max_token = self.store.get_room_max_stream_ordering()
+ self.notifier.on_new_room_event(
+ event, token, max_token, extra_users
+ )
+ elif stream_name == "push_rules":
+ self.notifier.on_new_event(
+ "push_rules_key", token, users=[row.user_id for row in rows],
)
- notify_from_stream(
- result, "tag_account_data", "account_data_key", user="user_id"
+ elif stream_name in ("account_data", "tag_account_data",):
+ self.notifier.on_new_event(
+ "account_data_key", token, users=[row.user_id for row in rows],
)
- notify_from_stream(
- result, "receipts", "receipt_key", room="room_id"
+ elif stream_name == "receipts":
+ self.notifier.on_new_event(
+ "receipt_key", token, rooms=[row.room_id for row in rows],
)
- notify_from_stream(
- result, "typing", "typing_key", room="room_id"
+ elif stream_name == "typing":
+ self.notifier.on_new_event(
+ "typing_key", token, rooms=[row.room_id for row in rows],
)
- notify_from_stream(
- result, "to_device", "to_device_key", user="user_id"
+ elif stream_name == "to_device":
+ entities = [row.entity for row in rows if row.entity.startswith("@")]
+ if entities:
+ self.notifier.on_new_event(
+ "to_device_key", token, users=entities,
+ )
+ elif stream_name == "device_lists":
+ all_room_ids = set()
+ for row in rows:
+ room_ids = yield self.store.get_rooms_for_user(row.user_id)
+ all_room_ids.update(room_ids)
+ self.notifier.on_new_event(
+ "device_list_key", token, rooms=all_room_ids,
)
- yield notify_device_list_update(result)
-
- while True:
- try:
- args = store.stream_positions()
- args.update(typing_handler.stream_positions())
- args["timeout"] = 30000
- result = yield http_client.get_json(replication_url, args=args)
- yield store.process_replication(result)
- typing_handler.process_replication(result)
- yield presence_handler.process_replication(result)
- yield notify(result)
- except:
- logger.exception("Error replicating from %r", replication_url)
- yield sleep(5)
-
- def build_presence_handler(self):
- return SynchrotronPresence(self)
-
- def build_typing_handler(self):
- return SynchrotronTyping(self)
def start(config_options):
@@ -514,7 +429,6 @@ def start(config_options):
def start():
ss.get_datastore().start_profiling()
- ss.replicate()
ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
|