diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ee41aed69e..f0f89af7dc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,10 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.metrics import (
+ event_processing_loop_counter,
+ event_processing_loop_room_count,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -136,6 +140,12 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc(len(events))
+ event_processing_loop_room_count.labels(
+ "appservice_sender"
+ ).inc(len(events_by_room))
+
+ event_processing_loop_counter.labels("appservice_sender").inc()
+
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 533b82c783..0dffd44e22 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ Membership,
+ RejectedReason,
+)
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -922,6 +927,9 @@ class FederationHandler(BaseHandler):
joinee,
"join",
content,
+ params={
+ "ver": KNOWN_ROOM_VERSIONS,
+ },
)
# This shouldn't happen, because the RoomMemberHandler has a
@@ -1187,13 +1195,14 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
- content={},):
+ content={}, params=None):
origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
membership,
content,
+ params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 39d7724778..bcb093ba3e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -171,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
- self.http_client = hs.get_simple_http_client()
+ self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -559,12 +559,9 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
- yield send_event_to_master(
- clock=self.hs.get_clock(),
+ yield self.send_event_to_master(
+ event_id=event.event_id,
store=self.store,
- client=self.http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b7804d9b2..6a17c42238 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,9 +21,17 @@ import math
import string
from collections import OrderedDict
+from six import string_types
+
from twisted.internet import defer
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+ DEFAULT_ROOM_VERSION,
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ JoinRules,
+ RoomCreationPreset,
+)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -99,6 +107,21 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
+ room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+ if not isinstance(room_version, string_types):
+ raise SynapseError(
+ 400,
+ "room_version must be a string",
+ Codes.BAD_JSON,
+ )
+
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ raise SynapseError(
+ 400,
+ "Your homeserver does not support this room version",
+ Codes.UNSUPPORTED_ROOM_VERSION,
+ )
+
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
@@ -184,6 +207,9 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
+ # override any attempt to set room versions via the creation_content
+ creation_content["room_version"] = room_version
+
room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 22d8b4b0d3..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,16 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
- get_or_register_3pid_guest,
- notify_user_membership_change,
- remote_join,
- remote_reject_invite,
+ ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+ ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+ ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+ ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
)
logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler):
+ def __init__(self, hs):
+ super(RoomMemberWorkerHandler, self).__init__(hs)
+
+ self._get_register_3pid_client = Repl3PID.make_client(hs)
+ self._remote_join_client = ReplRemoteJoin.make_client(hs)
+ self._remote_reject_client = ReplRejectInvite.make_client(hs)
+ self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
- ret = yield remote_join(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ ret = yield self._remote_join_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
- return remote_reject_invite(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
- return get_or_register_3pid_guest(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._get_register_3pid_client(
requester=requester,
medium=medium,
address=address,
|