summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py6
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/federation.py8
-rw-r--r--synapse/handlers/message.py8
-rw-r--r--synapse/handlers/presence.py44
-rw-r--r--synapse/handlers/receipts.py11
-rw-r--r--synapse/handlers/room_member.py47
-rw-r--r--synapse/handlers/sync.py16
-rw-r--r--synapse/handlers/typing.py93
9 files changed, 83 insertions, 152 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index f4dbf47c1d..9442ae6f1d 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,12 +24,9 @@ from .message import MessageHandler
 from .events import EventStreamHandler, EventHandler
 from .federation import FederationHandler
 from .profile import ProfileHandler
-from .presence import PresenceHandler
 from .directory import DirectoryHandler
-from .typing import TypingNotificationHandler
 from .admin import AdminHandler
 from .appservice import ApplicationServicesHandler
-from .sync import SyncHandler
 from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
@@ -53,10 +50,8 @@ class Handlers(object):
         self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
-        self.presence_handler = PresenceHandler(hs)
         self.room_list_handler = RoomListHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
-        self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.receipts_handler = ReceiptsHandler(hs)
         asapi = ApplicationServiceApi(hs)
@@ -67,7 +62,6 @@ class Handlers(object):
                 as_api=asapi
             )
         )
-        self.sync_handler = SyncHandler(hs)
         self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f25a252523..3a3a1257d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
         auth_user = UserID.from_string(auth_user_id)
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         context = yield presence_handler.user_syncing(
             auth_user_id, affect_presence=affect_presence,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c21d9d4d83..648a505e65 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -33,7 +33,7 @@ from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
 )
-from synapse.types import UserID, get_domian_from_id
+from synapse.types import UserID, get_domain_from_id
 
 from synapse.events.utils import prune_event
 
@@ -453,7 +453,7 @@ class FederationHandler(BaseHandler):
             joined_domains = {}
             for u, d in joined_users:
                 try:
-                    dom = get_domian_from_id(u)
+                    dom = get_domain_from_id(u)
                     old_d = joined_domains.get(dom)
                     if old_d:
                         joined_domains[dom] = min(d, old_d)
@@ -744,7 +744,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
-                        destinations.add(get_domian_from_id(s.state_key))
+                        destinations.add(get_domain_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -970,7 +970,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.LEAVE:
-                        destinations.add(get_domian_from_id(s.state_key))
+                        destinations.add(get_domain_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 13154edb78..c41dafdef5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,7 @@ from synapse.events.validator import EventValidator
 from synapse.push.action_generator import ActionGenerator
 from synapse.streams.config import PaginationConfig
 from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id
+    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
 )
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
@@ -236,7 +236,7 @@ class MessageHandler(BaseHandler):
         )
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.hs.get_presence_handler()
             yield presence.bump_presence_active_time(user)
 
     def deduplicate_state_event(self, event, context):
@@ -674,7 +674,7 @@ class MessageHandler(BaseHandler):
             and m.content["membership"] == Membership.JOIN
         ]
 
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         @defer.inlineCallbacks
         def get_presence():
@@ -902,7 +902,7 @@ class MessageHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
-                        destinations.add(get_domian_from_id(s.state_key))
+                        destinations.add(get_domain_from_id(s.state_key))
             except SynapseError:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a8529cce42..37f57301fb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domian_from_id
+from synapse.types import UserID, get_domain_from_id
 import synapse.metrics
 
-from ._base import BaseHandler
-
 import logging
 
 
@@ -73,11 +71,11 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
 
     def __init__(self, hs):
-        super(PresenceHandler, self).__init__(hs)
-        self.hs = hs
+        self.is_mine = hs.is_mine
+        self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -138,7 +136,7 @@ class PresenceHandler(BaseHandler):
                 obj=state.user_id,
                 then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
             )
-            if self.hs.is_mine_id(state.user_id):
+            if self.is_mine_id(state.user_id):
                 self.wheel_timer.insert(
                     now=now,
                     obj=state.user_id,
@@ -228,7 +226,7 @@ class PresenceHandler(BaseHandler):
 
                 new_state, should_notify, should_ping = handle_update(
                     prev_state, new_state,
-                    is_mine=self.hs.is_mine_id(user_id),
+                    is_mine=self.is_mine_id(user_id),
                     wheel_timer=self.wheel_timer,
                     now=now
                 )
@@ -287,7 +285,7 @@ class PresenceHandler(BaseHandler):
 
             changes = handle_timeouts(
                 states,
-                is_mine_fn=self.hs.is_mine_id,
+                is_mine_fn=self.is_mine_id,
                 user_to_num_current_syncs=self.user_to_num_current_syncs,
                 now=now,
             )
@@ -427,7 +425,7 @@ class PresenceHandler(BaseHandler):
 
         hosts_to_states = {}
         for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
@@ -436,11 +434,11 @@ class PresenceHandler(BaseHandler):
                 hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
-            host = get_domian_from_id(user_id)
+            host = get_domain_from_id(user_id)
             hosts_to_states.setdefault(host, []).extend(local_states)
 
         # TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -611,14 +609,14 @@ class PresenceHandler(BaseHandler):
         # don't need to send to local clients here, as that is done as part
         # of the event stream/sync.
         # TODO: Only send to servers not already in the room.
-        if self.hs.is_mine(user):
+        if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
             hosts = yield self.store.get_joined_hosts_for_room(room_id)
             self._push_to_remotes({host: (state,) for host in hosts})
         else:
             user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = filter(self.hs.is_mine_id, user_ids)
+            user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
@@ -628,7 +626,7 @@ class PresenceHandler(BaseHandler):
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         presence_list = yield self.store.get_presence_list(
@@ -659,7 +657,7 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        if self.hs.is_mine(observed_user):
+        if self.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
             yield self.federation.send_edu(
@@ -675,11 +673,11 @@ class PresenceHandler(BaseHandler):
     def invite_presence(self, observed_user, observer_user):
         """Handles new presence invites.
         """
-        if not self.hs.is_mine(observed_user):
+        if not self.is_mine(observed_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         # TODO: Don't auto accept
-        if self.hs.is_mine(observer_user):
+        if self.is_mine(observer_user):
             yield self.accept_presence(observed_user, observer_user)
         else:
             self.federation.send_edu(
@@ -742,7 +740,7 @@ class PresenceHandler(BaseHandler):
         Returns:
             A Deferred.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         yield self.store.del_presence_list(
@@ -834,7 +832,11 @@ def _format_user_presence_state(state, now):
 
 class PresenceEventSource(object):
     def __init__(self, hs):
-        self.hs = hs
+        # We can't call get_presence_handler here because there's a cycle:
+        #
+        #   Presence -> Notifier -> PresenceEventSource -> Presence
+        #
+        self.get_presence_handler = hs.get_presence_handler
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
 
@@ -860,7 +862,7 @@ class PresenceEventSource(object):
                 from_key = int(from_key)
             room_ids = room_ids or []
 
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
             if not room_ids:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a390a1b8bd..e62722d78d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler):
     def __init__(self, hs):
         super(ReceiptsHandler, self).__init__(hs)
 
+        self.server_name = hs.config.server_name
+        self.store = hs.get_datastore()
         self.hs = hs
         self.federation = hs.get_replication_layer()
         self.federation.register_edu_handler(
@@ -131,12 +133,9 @@ class ReceiptsHandler(BaseHandler):
             event_ids = receipt["event_ids"]
             data = receipt["data"]
 
-            remotedomains = set()
-
-            rm_handler = self.hs.get_handlers().room_member_handler
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=None, remotedomains=remotedomains
-            )
+            remotedomains = yield self.store.get_joined_hosts_for_room(room_id)
+            remotedomains = remotedomains.copy()
+            remotedomains.discard(self.server_name)
 
             logger.debug("Sending receipt to: %r", remotedomains)
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b44e52a515..7e616f44fd 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -56,35 +56,6 @@ class RoomMemberHandler(BaseHandler):
         self.distributor.declare("user_left_room")
 
     @defer.inlineCallbacks
-    def get_room_members(self, room_id):
-        users = yield self.store.get_users_in_room(room_id)
-
-        defer.returnValue([UserID.from_string(u) for u in users])
-
-    @defer.inlineCallbacks
-    def fetch_room_distributions_into(self, room_id, localusers=None,
-                                      remotedomains=None, ignore_user=None):
-        """Fetch the distribution of a room, adding elements to either
-        'localusers' or 'remotedomains', which should be a set() if supplied.
-        If ignore_user is set, ignore that user.
-
-        This function returns nothing; its result is performed by the
-        side-effect on the two passed sets. This allows easy accumulation of
-        member lists of multiple rooms at once if required.
-        """
-        members = yield self.get_room_members(room_id)
-        for member in members:
-            if ignore_user is not None and member == ignore_user:
-                continue
-
-            if self.hs.is_mine(member):
-                if localusers is not None:
-                    localusers.add(member)
-            else:
-                if remotedomains is not None:
-                    remotedomains.add(member.domain)
-
-    @defer.inlineCallbacks
     def _local_membership_update(
         self, requester, target, room_id, membership,
         prev_event_ids,
@@ -427,21 +398,6 @@ class RoomMemberHandler(BaseHandler):
             defer.returnValue(UserID.from_string(invite.sender))
 
     @defer.inlineCallbacks
-    def get_joined_rooms_for_user(self, user):
-        """Returns a list of roomids that the user has any of the given
-        membership states in."""
-
-        rooms = yield self.store.get_rooms_for_user(
-            user.to_string(),
-        )
-
-        # For some reason the list of events contains duplicates
-        # TODO(paul): work out why because I really don't think it should
-        room_ids = set(r.room_id for r in rooms)
-
-        defer.returnValue(room_ids)
-
-    @defer.inlineCallbacks
     def do_3pid_invite(
             self,
             room_id,
@@ -457,8 +413,7 @@ class RoomMemberHandler(BaseHandler):
         )
 
         if invitee:
-            handler = self.hs.get_handlers().room_member_handler
-            yield handler.update_membership(
+            yield self.update_membership(
                 requester,
                 UserID.from_string(invitee),
                 room_id,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 921215469f..9ebfccc8bf 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseHandler
-
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
@@ -133,10 +131,12 @@ class SyncResult(collections.namedtuple("SyncResult", [
         )
 
 
-class SyncHandler(BaseHandler):
+class SyncHandler(object):
 
     def __init__(self, hs):
-        super(SyncHandler, self).__init__(hs)
+        self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
+        self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache()
@@ -485,7 +485,6 @@ class SyncHandler(BaseHandler):
             sync_config, now_token, since_token
         )
 
-        rm_handler = self.hs.get_handlers().room_member_handler
         app_service = yield self.store.get_app_service_by_user_id(
             sync_config.user.to_string()
         )
@@ -493,9 +492,10 @@ class SyncHandler(BaseHandler):
             rooms = yield self.store.get_app_service_rooms(app_service)
             joined_room_ids = set(r.room_id for r in rooms)
         else:
-            joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
-                sync_config.user
+            rooms = yield self.store.get_rooms_for_user(
+                sync_config.user.to_string()
             )
+            joined_room_ids = set(r.room_id for r in rooms)
 
         user_id = sync_config.user.to_string()
 
@@ -639,7 +639,7 @@ class SyncHandler(BaseHandler):
 
         # For each newly joined room, we want to send down presence of
         # existing users.
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.presence_handler
         extra_presence_users = set()
         for room_id in newly_joined_rooms:
             users = yield self.store.get_users_in_room(event.room_id)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 8ce27f49ec..d46f05f426 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from ._base import BaseHandler
-
 from synapse.api.errors import SynapseError, AuthError
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.metrics import Measure
@@ -35,11 +33,13 @@ logger = logging.getLogger(__name__)
 RoomMember = namedtuple("RoomMember", ("room_id", "user"))
 
 
-class TypingNotificationHandler(BaseHandler):
+class TypingHandler(object):
     def __init__(self, hs):
-        super(TypingNotificationHandler, self).__init__(hs)
-
-        self.homeserver = hs
+        self.store = hs.get_datastore()
+        self.server_name = hs.config.server_name
+        self.auth = hs.get_auth()
+        self.is_mine = hs.is_mine
+        self.notifier = hs.get_notifier()
 
         self.clock = hs.get_clock()
 
@@ -67,7 +67,7 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
-        if not self.hs.is_mine(target_user):
+        if not self.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user:
@@ -110,7 +110,7 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def stopped_typing(self, target_user, auth_user, room_id):
-        if not self.hs.is_mine(target_user):
+        if not self.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user:
@@ -132,7 +132,7 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
-        if self.hs.is_mine(user):
+        if self.is_mine(user):
             member = RoomMember(room_id=room_id, user=user)
             yield self._stopped_typing(member)
 
@@ -157,32 +157,26 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _push_update(self, room_id, user, typing):
-        localusers = set()
-        remotedomains = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(
-            room_id, localusers=localusers, remotedomains=remotedomains
-        )
-
-        if localusers:
-            self._push_update_local(
-                room_id=room_id,
-                user=user,
-                typing=typing
-            )
+        domains = yield self.store.get_joined_hosts_for_room(room_id)
 
         deferreds = []
-        for domain in remotedomains:
-            deferreds.append(self.federation.send_edu(
-                destination=domain,
-                edu_type="m.typing",
-                content={
-                    "room_id": room_id,
-                    "user_id": user.to_string(),
-                    "typing": typing,
-                },
-            ))
+        for domain in domains:
+            if domain == self.server_name:
+                self._push_update_local(
+                    room_id=room_id,
+                    user=user,
+                    typing=typing
+                )
+            else:
+                deferreds.append(self.federation.send_edu(
+                    destination=domain,
+                    edu_type="m.typing",
+                    content={
+                        "room_id": room_id,
+                        "user_id": user.to_string(),
+                        "typing": typing,
+                    },
+                ))
 
         yield defer.DeferredList(deferreds, consumeErrors=True)
 
@@ -191,14 +185,9 @@ class TypingNotificationHandler(BaseHandler):
         room_id = content["room_id"]
         user = UserID.from_string(content["user_id"])
 
-        localusers = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(
-            room_id, localusers=localusers
-        )
+        domains = yield self.store.get_joined_hosts_for_room(room_id)
 
-        if localusers:
+        if self.server_name in domains:
             self._push_update_local(
                 room_id=room_id,
                 user=user,
@@ -238,22 +227,14 @@ class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
-        self._handler = None
-        self._room_member_handler = None
-
-    def handler(self):
-        # Avoid cyclic dependency in handler setup
-        if not self._handler:
-            self._handler = self.hs.get_handlers().typing_notification_handler
-        return self._handler
-
-    def room_member_handler(self):
-        if not self._room_member_handler:
-            self._room_member_handler = self.hs.get_handlers().room_member_handler
-        return self._room_member_handler
+        # We can't call get_typing_handler here because there's a cycle:
+        #
+        #   Typing -> Notifier -> TypingNotificationEventSource -> Typing
+        #
+        self.get_typing_handler = hs.get_typing_handler
 
     def _make_event_for(self, room_id):
-        typing = self.handler()._room_typing[room_id]
+        typing = self.get_typing_handler()._room_typing[room_id]
         return {
             "type": "m.typing",
             "room_id": room_id,
@@ -265,7 +246,7 @@ class TypingNotificationEventSource(object):
     def get_new_events(self, from_key, room_ids, **kwargs):
         with Measure(self.clock, "typing.get_new_events"):
             from_key = int(from_key)
-            handler = self.handler()
+            handler = self.get_typing_handler()
 
             events = []
             for room_id in room_ids:
@@ -279,7 +260,7 @@ class TypingNotificationEventSource(object):
             return events, handler._latest_room_serial
 
     def get_current_key(self):
-        return self.handler()._latest_room_serial
+        return self.get_typing_handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
         return ([], pagination_config.from_key)