diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index bf1b995dc2..b3fb408cfd 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -39,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
@@ -77,6 +78,7 @@ class SynchrotronSlavedStore(
SlavedFilteringStore,
SlavedPresenceStore,
SlavedDeviceInboxStore,
+ SlavedDeviceStore,
RoomStore,
BaseSlavedStore,
ClientIpStore, # After BaseSlavedStore because the constructor is different
@@ -289,7 +291,7 @@ class SynchrotronServer(HomeServer):
def _listen_http(self, listener_config):
port = listener_config["port"]
- bind_address = listener_config.get("bind_address", "")
+ bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
@@ -310,16 +312,19 @@ class SynchrotronServer(HomeServer):
})
root_resource = create_resource_tree(resources, Resource())
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=bind_address
- )
+
+ for address in bind_addresses:
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=address
+ )
+
logger.info("Synapse synchrotron now listening on port %d", port)
def start_listening(self, listeners):
@@ -327,15 +332,18 @@ class SynchrotronServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=listener.get("bind_address", '127.0.0.1')
- )
+ bind_addresses = listener["bind_addresses"]
+
+ for address in bind_addresses:
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=address
+ )
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -374,6 +382,27 @@ class SynchrotronServer(HomeServer):
stream_key, position, users=users, rooms=rooms
)
+ @defer.inlineCallbacks
+ def notify_device_list_update(result):
+ stream = result.get("device_lists")
+ if not stream:
+ return
+
+ position_index = stream["field_names"].index("position")
+ user_index = stream["field_names"].index("user_id")
+
+ for row in stream["rows"]:
+ position = row[position_index]
+ user_id = row[user_index]
+
+ rooms = yield store.get_rooms_for_user(user_id)
+ room_ids = [r.room_id for r in rooms]
+
+ notifier.on_new_event(
+ "device_list_key", position, rooms=room_ids,
+ )
+
+ @defer.inlineCallbacks
def notify(result):
stream = result.get("events")
if stream:
@@ -411,6 +440,7 @@ class SynchrotronServer(HomeServer):
notify_from_stream(
result, "to_device", "to_device_key", user="user_id"
)
+ yield notify_device_list_update(result)
while True:
try:
@@ -421,7 +451,7 @@ class SynchrotronServer(HomeServer):
yield store.process_replication(result)
typing_handler.process_replication(result)
yield presence_handler.process_replication(result)
- notify(result)
+ yield notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
|