diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index ba16f22c91..64edadb624 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -20,20 +20,30 @@ import urllib
from inspect import signature
from typing import Dict, List, Tuple
-from synapse.api.errors import (
- CodeMessageException,
- HttpResponseException,
- RequestSendFailed,
- SynapseError,
-)
+from prometheus_client import Counter, Gauge
+
+from synapse.api.errors import HttpResponseException, SynapseError
+from synapse.http import RequestTimedOutError
from synapse.logging.opentracing import inject_active_span_byte_dict, trace
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
+_pending_outgoing_requests = Gauge(
+ "synapse_pending_outgoing_replication_requests",
+ "Number of active outgoing replication requests, by replication method name",
+ ["name"],
+)
+
+_outgoing_request_counter = Counter(
+ "synapse_outgoing_replication_requests",
+ "Number of outgoing replication requests, by replication method name and result",
+ ["name", "code"],
+)
+
-class ReplicationEndpoint:
+class ReplicationEndpoint(metaclass=abc.ABCMeta):
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
@@ -72,8 +82,6 @@ class ReplicationEndpoint:
is received.
"""
- __metaclass__ = abc.ABCMeta
-
NAME = abc.abstractproperty() # type: str # type: ignore
PATH_ARGS = abc.abstractproperty() # type: Tuple[str, ...] # type: ignore
METHOD = "POST"
@@ -140,7 +148,10 @@ class ReplicationEndpoint:
instance_map = hs.config.worker.instance_map
+ outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
+
@trace(opname="outgoing_replication_request")
+ @outgoing_gauge.track_inprogress()
async def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
@@ -195,23 +206,26 @@ class ReplicationEndpoint:
try:
result = await request_func(uri, data, headers=headers)
break
- except CodeMessageException as e:
- if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
+ except RequestTimedOutError:
+ if not cls.RETRY_ON_TIMEOUT:
raise
- logger.warning("%s request timed out", cls.NAME)
+ logger.warning("%s request timed out; retrying", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
+ # on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
+ _outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
- except RequestSendFailed as e:
- raise SynapseError(502, "Failed to talk to master") from e
+ except Exception as e:
+ _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
+ raise SynapseError(502, "Failed to talk to main process") from e
+ _outgoing_request_counter.labels(cls.NAME, 200).inc()
return result
return send_request
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 20f3ba76c0..807b85d2e1 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -53,7 +53,7 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
CACHE = False
def __init__(self, hs):
- super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.device_list_updater = hs.get_device_handler().device_list_updater
self.store = hs.get_datastore()
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 6b56315148..5393b9a9e7 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -57,7 +57,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
PATH_ARGS = ()
def __init__(self, hs):
- super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.storage = hs.get_storage()
@@ -65,10 +65,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
- async def _serialize_payload(store, event_and_contexts, backfilled):
+ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
+ room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
@@ -88,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
}
)
- payload = {"events": event_payloads, "backfilled": backfilled}
+ payload = {
+ "events": event_payloads,
+ "backfilled": backfilled,
+ "room_id": room_id,
+ }
return payload
@@ -96,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
+ room_id = content["room_id"]
backfilled = content["backfilled"]
event_payloads = content["events"]
@@ -120,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
logger.info("Got %d events from federation", len(event_and_contexts))
max_stream_id = await self.federation_handler.persist_events_and_notify(
- event_and_contexts, backfilled
+ room_id, event_and_contexts, backfilled
)
return 200, {"max_stream_id": max_stream_id}
@@ -144,7 +150,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
PATH_ARGS = ("edu_type",)
def __init__(self, hs):
- super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@@ -187,7 +193,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
CACHE = False
def __init__(self, hs):
- super(ReplicationGetQueryRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@@ -230,7 +236,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
PATH_ARGS = ("room_id",)
def __init__(self, hs):
- super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index fb326bb869..4c81e2d784 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -32,7 +32,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
PATH_ARGS = ("user_id",)
def __init__(self, hs):
- super(RegisterDeviceReplicationServlet, self).__init__(hs)
+ super().__init__(hs)
self.registration_handler = hs.get_registration_handler()
@staticmethod
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 741329ab5f..30680baee8 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Optional
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, Requester, UserID
-from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.distributor import user_left_room
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -45,7 +45,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
PATH_ARGS = ("room_id", "user_id")
def __init__(self, hs):
- super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
@@ -107,7 +107,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
PATH_ARGS = ("invite_event_id",)
def __init__(self, hs: "HomeServer"):
- super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@@ -168,7 +168,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
CACHE = False # No point caching as should return instantly.
def __init__(self, hs):
- super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.registeration_handler = hs.get_registration_handler()
self.store = hs.get_datastore()
@@ -181,9 +181,9 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
Args:
room_id (str)
user_id (str)
- change (str): Either "joined" or "left"
+ change (str): "left"
"""
- assert change in ("joined", "left")
+ assert change == "left"
return {}
@@ -192,9 +192,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
user = UserID.from_string(user_id)
- if change == "joined":
- user_joined_room(self.distributor, user, room_id)
- elif change == "left":
+ if change == "left":
user_left_room(self.distributor, user, room_id)
else:
raise Exception("Unrecognized change: %r", change)
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index a02b27474d..7b12ec9060 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -29,7 +29,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
PATH_ARGS = ("user_id",)
def __init__(self, hs):
- super(ReplicationRegisterServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.registration_handler = hs.get_registration_handler()
@@ -104,7 +104,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
PATH_ARGS = ("user_id",)
def __init__(self, hs):
- super(ReplicationPostRegisterActionsServlet, self).__init__(hs)
+ super().__init__(hs)
self.store = hs.get_datastore()
self.registration_handler = hs.get_registration_handler()
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index f13d452426..9a3a694d5d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -52,7 +52,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
PATH_ARGS = ("event_id",)
def __init__(self, hs):
- super(ReplicationSendEventRestServlet, self).__init__(hs)
+ super().__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
|