diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index f4dbf47c1d..9442ae6f1d 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,12 +24,9 @@ from .message import MessageHandler
from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler
from .profile import ProfileHandler
-from .presence import PresenceHandler
from .directory import DirectoryHandler
-from .typing import TypingNotificationHandler
from .admin import AdminHandler
from .appservice import ApplicationServicesHandler
-from .sync import SyncHandler
from .auth import AuthHandler
from .identity import IdentityHandler
from .receipts import ReceiptsHandler
@@ -53,10 +50,8 @@ class Handlers(object):
self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
- self.presence_handler = PresenceHandler(hs)
self.room_list_handler = RoomListHandler(hs)
self.directory_handler = DirectoryHandler(hs)
- self.typing_notification_handler = TypingNotificationHandler(hs)
self.admin_handler = AdminHandler(hs)
self.receipts_handler = ReceiptsHandler(hs)
asapi = ApplicationServiceApi(hs)
@@ -67,7 +62,6 @@ class Handlers(object):
as_api=asapi
)
)
- self.sync_handler = SyncHandler(hs)
self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f25a252523..3a3a1257d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
auth_user = UserID.from_string(auth_user_id)
- presence_handler = self.hs.get_handlers().presence_handler
+ presence_handler = self.hs.get_presence_handler()
context = yield presence_handler.user_syncing(
auth_user_id, affect_presence=affect_presence,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c21d9d4d83..648a505e65 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -33,7 +33,7 @@ from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
-from synapse.types import UserID, get_domian_from_id
+from synapse.types import UserID, get_domain_from_id
from synapse.events.utils import prune_event
@@ -453,7 +453,7 @@ class FederationHandler(BaseHandler):
joined_domains = {}
for u, d in joined_users:
try:
- dom = get_domian_from_id(u)
+ dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
@@ -744,7 +744,7 @@ class FederationHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
- destinations.add(get_domian_from_id(s.state_key))
+ destinations.add(get_domain_from_id(s.state_key))
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
@@ -970,7 +970,7 @@ class FederationHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.LEAVE:
- destinations.add(get_domian_from_id(s.state_key))
+ destinations.add(get_domain_from_id(s.state_key))
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 13154edb78..c41dafdef5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,7 @@ from synapse.events.validator import EventValidator
from synapse.push.action_generator import ActionGenerator
from synapse.streams.config import PaginationConfig
from synapse.types import (
- UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id
+ UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
)
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
@@ -236,7 +236,7 @@ class MessageHandler(BaseHandler):
)
if event.type == EventTypes.Message:
- presence = self.hs.get_handlers().presence_handler
+ presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user)
def deduplicate_state_event(self, event, context):
@@ -674,7 +674,7 @@ class MessageHandler(BaseHandler):
and m.content["membership"] == Membership.JOIN
]
- presence_handler = self.hs.get_handlers().presence_handler
+ presence_handler = self.hs.get_presence_handler()
@defer.inlineCallbacks
def get_presence():
@@ -902,7 +902,7 @@ class MessageHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
- destinations.add(get_domian_from_id(s.state_key))
+ destinations.add(get_domain_from_id(s.state_key))
except SynapseError:
logger.warn(
"Failed to get destination from event %s", s.event_id
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a8529cce42..37f57301fb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domian_from_id
+from synapse.types import UserID, get_domain_from_id
import synapse.metrics
-from ._base import BaseHandler
-
import logging
@@ -73,11 +71,11 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
def __init__(self, hs):
- super(PresenceHandler, self).__init__(hs)
- self.hs = hs
+ self.is_mine = hs.is_mine
+ self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
@@ -138,7 +136,7 @@ class PresenceHandler(BaseHandler):
obj=state.user_id,
then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
- if self.hs.is_mine_id(state.user_id):
+ if self.is_mine_id(state.user_id):
self.wheel_timer.insert(
now=now,
obj=state.user_id,
@@ -228,7 +226,7 @@ class PresenceHandler(BaseHandler):
new_state, should_notify, should_ping = handle_update(
prev_state, new_state,
- is_mine=self.hs.is_mine_id(user_id),
+ is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
now=now
)
@@ -287,7 +285,7 @@ class PresenceHandler(BaseHandler):
changes = handle_timeouts(
states,
- is_mine_fn=self.hs.is_mine_id,
+ is_mine_fn=self.is_mine_id,
user_to_num_current_syncs=self.user_to_num_current_syncs,
now=now,
)
@@ -427,7 +425,7 @@ class PresenceHandler(BaseHandler):
hosts_to_states = {}
for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
@@ -436,11 +434,11 @@ class PresenceHandler(BaseHandler):
hosts_to_states.setdefault(host, []).extend(local_states)
for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
- host = get_domian_from_id(user_id)
+ host = get_domain_from_id(user_id)
hosts_to_states.setdefault(host, []).extend(local_states)
# TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -611,14 +609,14 @@ class PresenceHandler(BaseHandler):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- if self.hs.is_mine(user):
+ if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
hosts = yield self.store.get_joined_hosts_for_room(room_id)
self._push_to_remotes({host: (state,) for host in hosts})
else:
user_ids = yield self.store.get_users_in_room(room_id)
- user_ids = filter(self.hs.is_mine_id, user_ids)
+ user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
@@ -628,7 +626,7 @@ class PresenceHandler(BaseHandler):
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
"""
- if not self.hs.is_mine(observer_user):
+ if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
presence_list = yield self.store.get_presence_list(
@@ -659,7 +657,7 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
- if self.hs.is_mine(observed_user):
+ if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
yield self.federation.send_edu(
@@ -675,11 +673,11 @@ class PresenceHandler(BaseHandler):
def invite_presence(self, observed_user, observer_user):
"""Handles new presence invites.
"""
- if not self.hs.is_mine(observed_user):
+ if not self.is_mine(observed_user):
raise SynapseError(400, "User is not hosted on this Home Server")
# TODO: Don't auto accept
- if self.hs.is_mine(observer_user):
+ if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
self.federation.send_edu(
@@ -742,7 +740,7 @@ class PresenceHandler(BaseHandler):
Returns:
A Deferred.
"""
- if not self.hs.is_mine(observer_user):
+ if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
yield self.store.del_presence_list(
@@ -834,7 +832,11 @@ def _format_user_presence_state(state, now):
class PresenceEventSource(object):
def __init__(self, hs):
- self.hs = hs
+ # We can't call get_presence_handler here because there's a cycle:
+ #
+ # Presence -> Notifier -> PresenceEventSource -> Presence
+ #
+ self.get_presence_handler = hs.get_presence_handler
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -860,7 +862,7 @@ class PresenceEventSource(object):
from_key = int(from_key)
room_ids = room_ids or []
- presence = self.hs.get_handlers().presence_handler
+ presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
if not room_ids:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a390a1b8bd..e62722d78d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler):
def __init__(self, hs):
super(ReceiptsHandler, self).__init__(hs)
+ self.server_name = hs.config.server_name
+ self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_replication_layer()
self.federation.register_edu_handler(
@@ -131,12 +133,9 @@ class ReceiptsHandler(BaseHandler):
event_ids = receipt["event_ids"]
data = receipt["data"]
- remotedomains = set()
-
- rm_handler = self.hs.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=None, remotedomains=remotedomains
- )
+ remotedomains = yield self.store.get_joined_hosts_for_room(room_id)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b44e52a515..7e616f44fd 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -56,35 +56,6 @@ class RoomMemberHandler(BaseHandler):
self.distributor.declare("user_left_room")
@defer.inlineCallbacks
- def get_room_members(self, room_id):
- users = yield self.store.get_users_in_room(room_id)
-
- defer.returnValue([UserID.from_string(u) for u in users])
-
- @defer.inlineCallbacks
- def fetch_room_distributions_into(self, room_id, localusers=None,
- remotedomains=None, ignore_user=None):
- """Fetch the distribution of a room, adding elements to either
- 'localusers' or 'remotedomains', which should be a set() if supplied.
- If ignore_user is set, ignore that user.
-
- This function returns nothing; its result is performed by the
- side-effect on the two passed sets. This allows easy accumulation of
- member lists of multiple rooms at once if required.
- """
- members = yield self.get_room_members(room_id)
- for member in members:
- if ignore_user is not None and member == ignore_user:
- continue
-
- if self.hs.is_mine(member):
- if localusers is not None:
- localusers.add(member)
- else:
- if remotedomains is not None:
- remotedomains.add(member.domain)
-
- @defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
prev_event_ids,
@@ -427,21 +398,6 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(UserID.from_string(invite.sender))
@defer.inlineCallbacks
- def get_joined_rooms_for_user(self, user):
- """Returns a list of roomids that the user has any of the given
- membership states in."""
-
- rooms = yield self.store.get_rooms_for_user(
- user.to_string(),
- )
-
- # For some reason the list of events contains duplicates
- # TODO(paul): work out why because I really don't think it should
- room_ids = set(r.room_id for r in rooms)
-
- defer.returnValue(room_ids)
-
- @defer.inlineCallbacks
def do_3pid_invite(
self,
room_id,
@@ -457,8 +413,7 @@ class RoomMemberHandler(BaseHandler):
)
if invitee:
- handler = self.hs.get_handlers().room_member_handler
- yield handler.update_membership(
+ yield self.update_membership(
requester,
UserID.from_string(invitee),
room_id,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 921215469f..9ebfccc8bf 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseHandler
-
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
@@ -133,10 +131,12 @@ class SyncResult(collections.namedtuple("SyncResult", [
)
-class SyncHandler(BaseHandler):
+class SyncHandler(object):
def __init__(self, hs):
- super(SyncHandler, self).__init__(hs)
+ self.store = hs.get_datastore()
+ self.notifier = hs.get_notifier()
+ self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
@@ -485,7 +485,6 @@ class SyncHandler(BaseHandler):
sync_config, now_token, since_token
)
- rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
sync_config.user.to_string()
)
@@ -493,9 +492,10 @@ class SyncHandler(BaseHandler):
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
- joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
- sync_config.user
+ rooms = yield self.store.get_rooms_for_user(
+ sync_config.user.to_string()
)
+ joined_room_ids = set(r.room_id for r in rooms)
user_id = sync_config.user.to_string()
@@ -639,7 +639,7 @@ class SyncHandler(BaseHandler):
# For each newly joined room, we want to send down presence of
# existing users.
- presence_handler = self.hs.get_handlers().presence_handler
+ presence_handler = self.presence_handler
extra_presence_users = set()
for room_id in newly_joined_rooms:
users = yield self.store.get_users_in_room(event.room_id)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 8ce27f49ec..d46f05f426 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,8 +15,6 @@
from twisted.internet import defer
-from ._base import BaseHandler
-
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.metrics import Measure
@@ -35,11 +33,13 @@ logger = logging.getLogger(__name__)
RoomMember = namedtuple("RoomMember", ("room_id", "user"))
-class TypingNotificationHandler(BaseHandler):
+class TypingHandler(object):
def __init__(self, hs):
- super(TypingNotificationHandler, self).__init__(hs)
-
- self.homeserver = hs
+ self.store = hs.get_datastore()
+ self.server_name = hs.config.server_name
+ self.auth = hs.get_auth()
+ self.is_mine = hs.is_mine
+ self.notifier = hs.get_notifier()
self.clock = hs.get_clock()
@@ -67,7 +67,7 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def started_typing(self, target_user, auth_user, room_id, timeout):
- if not self.hs.is_mine(target_user):
+ if not self.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
if target_user != auth_user:
@@ -110,7 +110,7 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def stopped_typing(self, target_user, auth_user, room_id):
- if not self.hs.is_mine(target_user):
+ if not self.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")
if target_user != auth_user:
@@ -132,7 +132,7 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
- if self.hs.is_mine(user):
+ if self.is_mine(user):
member = RoomMember(room_id=room_id, user=user)
yield self._stopped_typing(member)
@@ -157,32 +157,26 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def _push_update(self, room_id, user, typing):
- localusers = set()
- remotedomains = set()
-
- rm_handler = self.homeserver.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=localusers, remotedomains=remotedomains
- )
-
- if localusers:
- self._push_update_local(
- room_id=room_id,
- user=user,
- typing=typing
- )
+ domains = yield self.store.get_joined_hosts_for_room(room_id)
deferreds = []
- for domain in remotedomains:
- deferreds.append(self.federation.send_edu(
- destination=domain,
- edu_type="m.typing",
- content={
- "room_id": room_id,
- "user_id": user.to_string(),
- "typing": typing,
- },
- ))
+ for domain in domains:
+ if domain == self.server_name:
+ self._push_update_local(
+ room_id=room_id,
+ user=user,
+ typing=typing
+ )
+ else:
+ deferreds.append(self.federation.send_edu(
+ destination=domain,
+ edu_type="m.typing",
+ content={
+ "room_id": room_id,
+ "user_id": user.to_string(),
+ "typing": typing,
+ },
+ ))
yield defer.DeferredList(deferreds, consumeErrors=True)
@@ -191,14 +185,9 @@ class TypingNotificationHandler(BaseHandler):
room_id = content["room_id"]
user = UserID.from_string(content["user_id"])
- localusers = set()
-
- rm_handler = self.homeserver.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=localusers
- )
+ domains = yield self.store.get_joined_hosts_for_room(room_id)
- if localusers:
+ if self.server_name in domains:
self._push_update_local(
room_id=room_id,
user=user,
@@ -238,22 +227,14 @@ class TypingNotificationEventSource(object):
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
- self._handler = None
- self._room_member_handler = None
-
- def handler(self):
- # Avoid cyclic dependency in handler setup
- if not self._handler:
- self._handler = self.hs.get_handlers().typing_notification_handler
- return self._handler
-
- def room_member_handler(self):
- if not self._room_member_handler:
- self._room_member_handler = self.hs.get_handlers().room_member_handler
- return self._room_member_handler
+ # We can't call get_typing_handler here because there's a cycle:
+ #
+ # Typing -> Notifier -> TypingNotificationEventSource -> Typing
+ #
+ self.get_typing_handler = hs.get_typing_handler
def _make_event_for(self, room_id):
- typing = self.handler()._room_typing[room_id]
+ typing = self.get_typing_handler()._room_typing[room_id]
return {
"type": "m.typing",
"room_id": room_id,
@@ -265,7 +246,7 @@ class TypingNotificationEventSource(object):
def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
- handler = self.handler()
+ handler = self.get_typing_handler()
events = []
for room_id in room_ids:
@@ -279,7 +260,7 @@ class TypingNotificationEventSource(object):
return events, handler._latest_room_serial
def get_current_key(self):
- return self.handler()._latest_room_serial
+ return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
return ([], pagination_config.from_key)
|