summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-04-29 16:23:08 +0100
committerGitHub <noreply@github.com>2020-04-29 16:23:08 +0100
commit37f6823f5b91f27b9dd8de8fc0e52d5ea889647c (patch)
treedf9a392c3b1d4470185dcfb90e63c1005ef4e4bb
parentDon't relay REMOTE_SERVER_UP cmds to same conn. (#7352) (diff)
downloadsynapse-37f6823f5b91f27b9dd8de8fc0e52d5ea889647c.tar.xz
Add instance name to RDATA/POSITION commands (#7364)
This is primarily for allowing us to send those commands from workers, but for now simply allows us to ignore echoed RDATA/POSITION commands that we sent (we get echoes of sent commands when using redis). Currently we log a WARNING on the master process every time we receive an echoed RDATA.

-rw-r--r--changelog.d/7364.misc1
-rw-r--r--docs/tcp_replication.md41
-rw-r--r--synapse/app/_base.py4
-rw-r--r--synapse/logging/opentracing.py23
-rw-r--r--synapse/replication/tcp/commands.py37
-rw-r--r--synapse/replication/tcp/handler.py17
-rw-r--r--synapse/server.py13
-rw-r--r--synapse/server.pyi2
-rw-r--r--tests/replication/slave/storage/_base.py1
-rw-r--r--tests/replication/tcp/test_commands.py6
10 files changed, 95 insertions, 50 deletions
diff --git a/changelog.d/7364.misc b/changelog.d/7364.misc
new file mode 100644
index 0000000000..bb5d727cf4
--- /dev/null
+++ b/changelog.d/7364.misc
@@ -0,0 +1 @@
+Add an `instance_name` to `RDATA` and `POSITION` replication commands.
diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md
index b922d9cf7e..ab2fffbfe4 100644
--- a/docs/tcp_replication.md
+++ b/docs/tcp_replication.md
@@ -15,15 +15,17 @@ example flow would be (where '>' indicates master to worker and
 
     > SERVER example.com
     < REPLICATE
-    > POSITION events 53
-    > RDATA events 54 ["$foo1:bar.com", ...]
-    > RDATA events 55 ["$foo4:bar.com", ...]
+    > POSITION events master 53
+    > RDATA events master 54 ["$foo1:bar.com", ...]
+    > RDATA events master 55 ["$foo4:bar.com", ...]
 
 The example shows the server accepting a new connection and sending its identity
 with the `SERVER` command, followed by the client server to respond with the
 position of all streams. The server then periodically sends `RDATA` commands
-which have the format `RDATA <stream_name> <token> <row>`, where the format of
-`<row>` is defined by the individual streams.
+which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
+the format of `<row>` is defined by the individual streams. The
+`<instance_name>` is the name of the Synapse process that generated the data
+(usually "master").
 
 Error reporting happens by either the client or server sending an ERROR
 command, and usually the connection will be closed.
@@ -52,7 +54,7 @@ The basic structure of the protocol is line based, where the initial
 word of each line specifies the command. The rest of the line is parsed
 based on the command. For example, the RDATA command is defined as:
 
-    RDATA <stream_name> <token> <row_json>
+    RDATA <stream_name> <instance_name> <token> <row_json>
 
 (Note that <row_json> may contains spaces, but cannot contain
 newlines.)
@@ -136,11 +138,11 @@ the wire:
     < NAME synapse.app.appservice
     < PING 1490197665618
     < REPLICATE
-    > POSITION events 1
-    > POSITION backfill 1
-    > POSITION caches 1
-    > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
-    > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
+    > POSITION events master 1
+    > POSITION backfill master 1
+    > POSITION caches master 1
+    > RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
+    > RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
         "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
     < PING 1490197675618
     > ERROR server stopping
@@ -151,10 +153,10 @@ position without needing to send data with the `RDATA` command.
 
 An example of a batched set of `RDATA` is:
 
-    > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
-    > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
-    > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
-    > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
+    > RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
+    > RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
+    > RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
+    > RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
 
 In this case the client shouldn't advance their caches token until it
 sees the the last `RDATA`.
@@ -178,6 +180,11 @@ client (C):
    updates, and if so then fetch them out of band. Sent in response to a
    REPLICATE command (but can happen at any time).
 
+   The POSITION command includes the source of the stream. Currently all streams
+   are written by a single process (usually "master"). If fetching missing
+   updates via HTTP API, rather than via the DB, then processes should make the
+   request to the appropriate process.
+
 #### ERROR (S, C)
 
    There was an error
@@ -234,12 +241,12 @@ Each individual cache invalidation results in a row being sent down
 replication, which includes the cache name (the name of the function)
 and they key to invalidate. For example:
 
-    > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
+    > RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
 
 Alternatively, an entire cache can be invalidated by sending down a `null`
 instead of the key. For example:
 
-    > RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
+    > RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]
 
 However, there are times when a number of caches need to be invalidated
 at the same time with the same key. To reduce traffic we batch those
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 4d84f4595a..628292b890 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -270,7 +270,7 @@ def start(hs, listeners=None):
 
         # Start the tracer
         synapse.logging.opentracing.init_tracer(  # type: ignore[attr-defined] # noqa
-            hs.config
+            hs
         )
 
         # It is now safe to start your Synapse.
@@ -316,7 +316,7 @@ def setup_sentry(hs):
         scope.set_tag("matrix_server_name", hs.config.server_name)
 
         app = hs.config.worker_app if hs.config.worker_app else "synapse.app.homeserver"
-        name = hs.config.worker_name if hs.config.worker_name else "master"
+        name = hs.get_instance_name()
         scope.set_tag("worker_app", app)
         scope.set_tag("worker_name", name)
 
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 0638cec429..5dddf57008 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -171,7 +171,7 @@ import logging
 import re
 import types
 from functools import wraps
-from typing import Dict
+from typing import TYPE_CHECKING, Dict
 
 from canonicaljson import json
 
@@ -179,6 +179,9 @@ from twisted.internet import defer
 
 from synapse.config import ConfigError
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 # Helper class
 
 
@@ -297,14 +300,11 @@ def _noop_context_manager(*args, **kwargs):
 # Setup
 
 
-def init_tracer(config):
+def init_tracer(hs: "HomeServer"):
     """Set the whitelists and initialise the JaegerClient tracer
-
-    Args:
-        config (HomeserverConfig): The config used by the homeserver
     """
     global opentracing
-    if not config.opentracer_enabled:
+    if not hs.config.opentracer_enabled:
         # We don't have a tracer
         opentracing = None
         return
@@ -315,18 +315,15 @@ def init_tracer(config):
             "installed."
         )
 
-    # Include the worker name
-    name = config.worker_name if config.worker_name else "master"
-
     # Pull out the jaeger config if it was given. Otherwise set it to something sensible.
     # See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py
 
-    set_homeserver_whitelist(config.opentracer_whitelist)
+    set_homeserver_whitelist(hs.config.opentracer_whitelist)
 
     JaegerConfig(
-        config=config.jaeger_config,
-        service_name="{} {}".format(config.server_name, name),
-        scope_manager=LogContextScopeManager(config),
+        config=hs.config.jaeger_config,
+        service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
+        scope_manager=LogContextScopeManager(hs.config),
     ).initialize_tracer()
 
 
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index c7880d4b63..f58e384d17 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -95,7 +95,7 @@ class RdataCommand(Command):
 
     Format::
 
-        RDATA <stream_name> <token> <row_json>
+        RDATA <stream_name> <instance_name> <token> <row_json>
 
     The `<token>` may either be a numeric stream id OR "batch". The latter case
     is used to support sending multiple updates with the same stream ID. This
@@ -105,33 +105,40 @@ class RdataCommand(Command):
     The client should batch all incoming RDATA with a token of "batch" (per
     stream_name) until it sees an RDATA with a numeric stream ID.
 
+    The `<instance_name>` is the source of the new data (usually "master").
+
     `<token>` of "batch" maps to the instance variable `token` being None.
 
     An example of a batched series of RDATA::
 
-        RDATA presence batch ["@foo:example.com", "online", ...]
-        RDATA presence batch ["@bar:example.com", "online", ...]
-        RDATA presence 59 ["@baz:example.com", "online", ...]
+        RDATA presence master batch ["@foo:example.com", "online", ...]
+        RDATA presence master batch ["@bar:example.com", "online", ...]
+        RDATA presence master 59 ["@baz:example.com", "online", ...]
     """
 
     NAME = "RDATA"
 
-    def __init__(self, stream_name, token, row):
+    def __init__(self, stream_name, instance_name, token, row):
         self.stream_name = stream_name
+        self.instance_name = instance_name
         self.token = token
         self.row = row
 
     @classmethod
     def from_line(cls, line):
-        stream_name, token, row_json = line.split(" ", 2)
+        stream_name, instance_name, token, row_json = line.split(" ", 3)
         return cls(
-            stream_name, None if token == "batch" else int(token), json.loads(row_json)
+            stream_name,
+            instance_name,
+            None if token == "batch" else int(token),
+            json.loads(row_json),
         )
 
     def to_line(self):
         return " ".join(
             (
                 self.stream_name,
+                self.instance_name,
                 str(self.token) if self.token is not None else "batch",
                 _json_encoder.encode(self.row),
             )
@@ -145,23 +152,31 @@ class PositionCommand(Command):
     """Sent by the server to tell the client the stream postition without
     needing to send an RDATA.
 
+    Format::
+
+        POSITION <stream_name> <instance_name> <token>
+
     On receipt of a POSITION command clients should check if they have missed
     any updates, and if so then fetch them out of band.
+
+    The `<instance_name>` is the process that sent the command and is the source
+    of the stream.
     """
 
     NAME = "POSITION"
 
-    def __init__(self, stream_name, token):
+    def __init__(self, stream_name, instance_name, token):
         self.stream_name = stream_name
+        self.instance_name = instance_name
         self.token = token
 
     @classmethod
     def from_line(cls, line):
-        stream_name, token = line.split(" ", 1)
-        return cls(stream_name, int(token))
+        stream_name, instance_name, token = line.split(" ", 2)
+        return cls(stream_name, instance_name, int(token))
 
     def to_line(self):
-        return " ".join((self.stream_name, str(self.token)))
+        return " ".join((self.stream_name, self.instance_name, str(self.token)))
 
 
 class ErrorCommand(_SimpleCommand):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b8f49a8d0f..6f7054d5af 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -79,6 +79,7 @@ class ReplicationCommandHandler:
         self._notifier = hs.get_notifier()
         self._clock = hs.get_clock()
         self._instance_id = hs.get_instance_id()
+        self._instance_name = hs.get_instance_name()
 
         # Set of streams that we've caught up with.
         self._streams_connected = set()  # type: Set[str]
@@ -156,7 +157,7 @@ class ReplicationCommandHandler:
                 hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
             )
         else:
-            client_name = hs.config.worker_name
+            client_name = hs.get_instance_name()
             self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
             host = hs.config.worker_replication_host
             port = hs.config.worker_replication_port
@@ -170,7 +171,9 @@ class ReplicationCommandHandler:
 
         for stream_name, stream in self._streams.items():
             current_token = stream.current_token()
-            self.send_command(PositionCommand(stream_name, current_token))
+            self.send_command(
+                PositionCommand(stream_name, self._instance_name, current_token)
+            )
 
     async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand):
         user_sync_counter.inc()
@@ -235,6 +238,10 @@ class ReplicationCommandHandler:
             await self._server_notices_sender.on_user_ip(cmd.user_id)
 
     async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
+        if cmd.instance_name == self._instance_name:
+            # Ignore RDATA that are just our own echoes
+            return
+
         stream_name = cmd.stream_name
         inbound_rdata_count.labels(stream_name).inc()
 
@@ -286,6 +293,10 @@ class ReplicationCommandHandler:
         await self._replication_data_handler.on_rdata(stream_name, token, rows)
 
     async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
+        if cmd.instance_name == self._instance_name:
+            # Ignore POSITION that are just our own echoes
+            return
+
         stream = self._streams.get(cmd.stream_name)
         if not stream:
             logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
@@ -485,7 +496,7 @@ class ReplicationCommandHandler:
 
         We need to check if the client is interested in the stream or not
         """
-        self.send_command(RdataCommand(stream_name, token, data))
+        self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
 
 
 UpdateToken = TypeVar("UpdateToken")
diff --git a/synapse/server.py b/synapse/server.py
index 9d273c980c..bf97a16c09 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -234,7 +234,8 @@ class HomeServer(object):
         self._listening_services = []
         self.start_time = None
 
-        self.instance_id = random_string(5)
+        self._instance_id = random_string(5)
+        self._instance_name = config.worker_name or "master"
 
         self.clock = Clock(reactor)
         self.distributor = Distributor()
@@ -254,7 +255,15 @@ class HomeServer(object):
         This is used to distinguish running instances in worker-based
         deployments.
         """
-        return self.instance_id
+        return self._instance_id
+
+    def get_instance_name(self) -> str:
+        """A unique name for this synapse process.
+
+        Used to identify the process over replication and in config. Does not
+        change over restarts.
+        """
+        return self._instance_name
 
     def setup(self):
         logger.info("Setting up.")
diff --git a/synapse/server.pyi b/synapse/server.pyi
index fc5886f762..18043a2593 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -122,6 +122,8 @@ class HomeServer(object):
         pass
     def get_instance_id(self) -> str:
         pass
+    def get_instance_name(self) -> str:
+        pass
     def get_event_builder_factory(self) -> EventBuilderFactory:
         pass
     def get_storage(self) -> synapse.storage.Storage:
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 395c7d0306..1615dfab5e 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -57,6 +57,7 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
         # We now do some gut wrenching so that we have a client that is based
         # off of the slave store rather than the main store.
         self.replication_handler = ReplicationCommandHandler(self.hs)
+        self.replication_handler._instance_name = "worker"
         self.replication_handler._replication_data_handler = ReplicationDataHandler(
             self.slaved_store
         )
diff --git a/tests/replication/tcp/test_commands.py b/tests/replication/tcp/test_commands.py
index 3cbcb513cc..7ddfd0a733 100644
--- a/tests/replication/tcp/test_commands.py
+++ b/tests/replication/tcp/test_commands.py
@@ -28,15 +28,17 @@ class ParseCommandTestCase(TestCase):
         self.assertIsInstance(cmd, ReplicateCommand)
 
     def test_parse_rdata(self):
-        line = 'RDATA events 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
+        line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
         cmd = parse_command_from_line(line)
         self.assertIsInstance(cmd, RdataCommand)
         self.assertEqual(cmd.stream_name, "events")
+        self.assertEqual(cmd.instance_name, "master")
         self.assertEqual(cmd.token, 6287863)
 
     def test_parse_rdata_batch(self):
-        line = 'RDATA presence batch ["@foo:example.com", "online"]'
+        line = 'RDATA presence master batch ["@foo:example.com", "online"]'
         cmd = parse_command_from_line(line)
         self.assertIsInstance(cmd, RdataCommand)
         self.assertEqual(cmd.stream_name, "presence")
+        self.assertEqual(cmd.instance_name, "master")
         self.assertIsNone(cmd.token)