summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/appservice.py46
-rw-r--r--synapse/app/client_reader.py46
-rw-r--r--synapse/app/federation_reader.py46
-rw-r--r--synapse/app/federation_sender.py49
-rwxr-xr-xsynapse/app/homeserver.py68
-rw-r--r--synapse/app/media_repository.py46
-rw-r--r--synapse/app/pusher.py46
-rw-r--r--synapse/app/synchrotron.py72
8 files changed, 246 insertions, 173 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index dd9ee406a1..1900930053 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -76,7 +76,7 @@ class AppserviceServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -85,16 +85,19 @@ class AppserviceServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse appservice now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -102,15 +105,18 @@ class AppserviceServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 0086a2977e..4d081eccd1 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -90,7 +90,7 @@ class ClientReaderServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -108,16 +108,19 @@ class ClientReaderServer(HomeServer):
                     })
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse client reader now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -125,15 +128,18 @@ class ClientReaderServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index b5f59a9931..90a4816753 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -86,7 +86,7 @@ class FederationReaderServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -99,16 +99,19 @@ class FederationReaderServer(HomeServer):
                     })
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse federation reader now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -116,15 +119,18 @@ class FederationReaderServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 80ea4c8062..411e47d98d 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -30,6 +30,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.util.async import sleep
@@ -56,7 +57,7 @@ logger = logging.getLogger("synapse.app.appservice")
 
 class FederationSenderSlaveStore(
     SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
-    SlavedRegistrationStore,
+    SlavedRegistrationStore, SlavedDeviceStore,
 ):
     pass
 
@@ -82,7 +83,7 @@ class FederationSenderServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -91,16 +92,19 @@ class FederationSenderServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse federation_sender now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -108,15 +112,18 @@ class FederationSenderServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 54f35900f8..e0b87468fe 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -107,7 +107,7 @@ def build_resource_for_web_client(hs):
 class SynapseHomeServer(HomeServer):
     def _listener_http(self, config, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         tls = listener_config.get("tls", False)
         site_tag = listener_config.get("tag", port)
 
@@ -173,29 +173,32 @@ class SynapseHomeServer(HomeServer):
             root_resource = Resource()
 
         root_resource = create_resource_tree(resources, root_resource)
+
         if tls:
-            reactor.listenSSL(
-                port,
-                SynapseSite(
-                    "synapse.access.https.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                ),
-                self.tls_server_context_factory,
-                interface=bind_address
-            )
+            for address in bind_addresses:
+                reactor.listenSSL(
+                    port,
+                    SynapseSite(
+                        "synapse.access.https.%s" % (site_tag,),
+                        site_tag,
+                        listener_config,
+                        root_resource,
+                    ),
+                    self.tls_server_context_factory,
+                    interface=address
+                )
         else:
-            reactor.listenTCP(
-                port,
-                SynapseSite(
-                    "synapse.access.http.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                ),
-                interface=bind_address
-            )
+            for address in bind_addresses:
+                reactor.listenTCP(
+                    port,
+                    SynapseSite(
+                        "synapse.access.http.%s" % (site_tag,),
+                        site_tag,
+                        listener_config,
+                        root_resource,
+                    ),
+                    interface=address
+                )
         logger.info("Synapse now listening on port %d", port)
 
     def start_listening(self):
@@ -205,15 +208,18 @@ class SynapseHomeServer(HomeServer):
             if listener["type"] == "http":
                 self._listener_http(config, listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 44c19a1bef..ef17b158a5 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -87,7 +87,7 @@ class MediaRepositoryServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -105,16 +105,19 @@ class MediaRepositoryServer(HomeServer):
                     })
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse media repository now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -122,15 +125,18 @@ class MediaRepositoryServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index a0e765c54f..073f2c2489 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -121,7 +121,7 @@ class PusherServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -130,16 +130,19 @@ class PusherServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse pusher now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -147,15 +150,18 @@ class PusherServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index bf1b995dc2..b3fb408cfd 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -39,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
@@ -77,6 +78,7 @@ class SynchrotronSlavedStore(
     SlavedFilteringStore,
     SlavedPresenceStore,
     SlavedDeviceInboxStore,
+    SlavedDeviceStore,
     RoomStore,
     BaseSlavedStore,
     ClientIpStore,  # After BaseSlavedStore because the constructor is different
@@ -289,7 +291,7 @@ class SynchrotronServer(HomeServer):
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
-        bind_address = listener_config.get("bind_address", "")
+        bind_addresses = listener_config["bind_addresses"]
         site_tag = listener_config.get("tag", port)
         resources = {}
         for res in listener_config["resources"]:
@@ -310,16 +312,19 @@ class SynchrotronServer(HomeServer):
                     })
 
         root_resource = create_resource_tree(resources, Resource())
-        reactor.listenTCP(
-            port,
-            SynapseSite(
-                "synapse.access.http.%s" % (site_tag,),
-                site_tag,
-                listener_config,
-                root_resource,
-            ),
-            interface=bind_address
-        )
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
         logger.info("Synapse synchrotron now listening on port %d", port)
 
     def start_listening(self, listeners):
@@ -327,15 +332,18 @@ class SynchrotronServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                reactor.listenTCP(
-                    listener["port"],
-                    manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    ),
-                    interface=listener.get("bind_address", '127.0.0.1')
-                )
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -374,6 +382,27 @@ class SynchrotronServer(HomeServer):
                         stream_key, position, users=users, rooms=rooms
                     )
 
+        @defer.inlineCallbacks
+        def notify_device_list_update(result):
+            stream = result.get("device_lists")
+            if not stream:
+                return
+
+            position_index = stream["field_names"].index("position")
+            user_index = stream["field_names"].index("user_id")
+
+            for row in stream["rows"]:
+                position = row[position_index]
+                user_id = row[user_index]
+
+                rooms = yield store.get_rooms_for_user(user_id)
+                room_ids = [r.room_id for r in rooms]
+
+                notifier.on_new_event(
+                    "device_list_key", position, rooms=room_ids,
+                )
+
+        @defer.inlineCallbacks
         def notify(result):
             stream = result.get("events")
             if stream:
@@ -411,6 +440,7 @@ class SynchrotronServer(HomeServer):
             notify_from_stream(
                 result, "to_device", "to_device_key", user="user_id"
             )
+            yield notify_device_list_update(result)
 
         while True:
             try:
@@ -421,7 +451,7 @@ class SynchrotronServer(HomeServer):
                 yield store.process_replication(result)
                 typing_handler.process_replication(result)
                 yield presence_handler.process_replication(result)
-                notify(result)
+                yield notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(5)