diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 10 | ||||
-rw-r--r-- | synapse/handlers/message.py | 11 | ||||
-rw-r--r-- | synapse/handlers/room_member_worker.py | 41 |
3 files changed, 31 insertions, 31 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ee41aed69e..f0f89af7dc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,10 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics import ( + event_processing_loop_counter, + event_processing_loop_room_count, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -136,6 +140,12 @@ class ApplicationServicesHandler(object): events_processed_counter.inc(len(events)) + event_processing_loop_room_count.labels( + "appservice_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("appservice_sender").inc() + synapse.metrics.event_processing_lag.labels( "appservice_sender").set(now - ts) synapse.metrics.event_processing_last_ts.labels( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 39d7724778..bcb093ba3e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.replication.http.send_event import send_event_to_master +from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.types import RoomAlias, UserID from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder @@ -171,7 +171,7 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config - self.http_client = hs.get_simple_http_client() + self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -559,12 +559,9 @@ class EventCreationHandler(object): try: # If we're a worker we need to hit out to the master. if self.config.worker_app: - yield send_event_to_master( - clock=self.hs.get_clock(), + yield self.send_event_to_master( + event_id=event.event_id, store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, requester=requester, event=event, context=context, diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 22d8b4b0d3..acc6eb8099 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -20,16 +20,24 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler from synapse.replication.http.membership import ( - get_or_register_3pid_guest, - notify_user_membership_change, - remote_join, - remote_reject_invite, + ReplicationRegister3PIDGuestRestServlet as Repl3PID, + ReplicationRemoteJoinRestServlet as ReplRemoteJoin, + ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite, + ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft, ) logger = logging.getLogger(__name__) class RoomMemberWorkerHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberWorkerHandler, self).__init__(hs) + + self._get_register_3pid_client = Repl3PID.make_client(hs) + self._remote_join_client = ReplRemoteJoin.make_client(hs) + self._remote_reject_client = ReplRejectInvite.make_client(hs) + self._notify_change_client = ReplJoinedLeft.make_client(hs) + @defer.inlineCallbacks def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join @@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") - ret = yield remote_join( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + ret = yield self._remote_join_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): """Implements RoomMemberHandler._remote_reject_invite """ - return remote_reject_invite( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._remote_reject_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _user_joined_room(self, target, room_id): """Implements RoomMemberHandler._user_joined_room """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="joined", @@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _user_left_room(self, target, room_id): """Implements RoomMemberHandler._user_left_room """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left", @@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): """Implements RoomMemberHandler.get_or_register_3pid_guest """ - return get_or_register_3pid_guest( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._get_register_3pid_client( requester=requester, medium=medium, address=address, |