summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py3
-rw-r--r--synapse/handlers/appservice.py99
-rw-r--r--synapse/handlers/auth.py6
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/handlers/presence.py27
5 files changed, 82 insertions, 62 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 1a50a2ec98..63d05f2531 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -19,7 +19,6 @@ from .room import (
 )
 from .room_member import RoomMemberHandler
 from .message import MessageHandler
-from .events import EventStreamHandler, EventHandler
 from .federation import FederationHandler
 from .profile import ProfileHandler
 from .directory import DirectoryHandler
@@ -53,8 +52,6 @@ class Handlers(object):
         self.message_handler = MessageHandler(hs)
         self.room_creation_handler = RoomCreationHandler(hs)
         self.room_member_handler = RoomMemberHandler(hs)
-        self.event_stream_handler = EventStreamHandler(hs)
-        self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index f124590e4a..52e897d8d9 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,7 +16,8 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
-from synapse.appservice import ApplicationService
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -42,36 +43,60 @@ class ApplicationServicesHandler(object):
         self.appservice_api = hs.get_application_service_api()
         self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
+        self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
-    def notify_interested_services(self, event):
+    def notify_interested_services(self, current_id):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
         prolonged length of time.
 
         Args:
-            event(Event): The event to push out to interested services.
+            current_id(int): The current maximum ID.
         """
-        # Gather interested services
-        services = yield self._get_services_for_event(event)
-        if len(services) == 0:
-            return  # no services need notifying
-
-        # Do we know this user exists? If not, poke the user query API for
-        # all services which match that user regex. This needs to block as these
-        # user queries need to be made BEFORE pushing the event.
-        yield self._check_user_exists(event.sender)
-        if event.type == EventTypes.Member:
-            yield self._check_user_exists(event.state_key)
-
-        if not self.started_scheduler:
-            self.scheduler.start().addErrback(log_failure)
-            self.started_scheduler = True
-
-        # Fork off pushes to these services
-        for service in services:
-            self.scheduler.submit_event_for_as(service, event)
+        services = yield self.store.get_app_services()
+        if not services:
+            return
+
+        with Measure(self.clock, "notify_interested_services"):
+            upper_bound = current_id
+            limit = 100
+            while True:
+                upper_bound, events = yield self.store.get_new_events_for_appservice(
+                    upper_bound, limit
+                )
+
+                logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
+
+                if not events:
+                    break
+
+                for event in events:
+                    # Gather interested services
+                    services = yield self._get_services_for_event(event)
+                    if len(services) == 0:
+                        continue  # no services need notifying
+
+                    # Do we know this user exists? If not, poke the user query API for
+                    # all services which match that user regex. This needs to block as
+                    # these user queries need to be made BEFORE pushing the event.
+                    yield self._check_user_exists(event.sender)
+                    if event.type == EventTypes.Member:
+                        yield self._check_user_exists(event.state_key)
+
+                    if not self.started_scheduler:
+                        self.scheduler.start().addErrback(log_failure)
+                        self.started_scheduler = True
+
+                    # Fork off pushes to these services
+                    for service in services:
+                        preserve_fn(self.scheduler.submit_event_for_as)(service, event)
+
+                yield self.store.set_appservice_last_pos(upper_bound)
+
+                if len(events) < limit:
+                    break
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):
@@ -104,11 +129,12 @@ class ApplicationServicesHandler(object):
             association can be found.
         """
         room_alias_str = room_alias.to_string()
-        alias_query_services = yield self._get_services_for_event(
-            event=None,
-            restrict_to=ApplicationService.NS_ALIASES,
-            alias_list=[room_alias_str]
-        )
+        services = yield self.store.get_app_services()
+        alias_query_services = [
+            s for s in services if (
+                s.is_interested_in_alias(room_alias_str)
+            )
+        ]
         for alias_service in alias_query_services:
             is_known_alias = yield self.appservice_api.query_alias(
                 alias_service, room_alias_str
@@ -136,34 +162,19 @@ class ApplicationServicesHandler(object):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def _get_services_for_event(self, event, restrict_to="", alias_list=None):
+    def _get_services_for_event(self, event):
         """Retrieve a list of application services interested in this event.
 
         Args:
             event(Event): The event to check. Can be None if alias_list is not.
-            restrict_to(str): The namespace to restrict regex tests to.
-            alias_list: A list of aliases to get services for. If None, this
-            list is obtained from the database.
         Returns:
             list<ApplicationService>: A list of services interested in this
             event based on the service regex.
         """
-        member_list = None
-        if hasattr(event, "room_id"):
-            # We need to know the aliases associated with this event.room_id,
-            # if any.
-            if not alias_list:
-                alias_list = yield self.store.get_aliases_for_room(
-                    event.room_id
-                )
-            # We need to know the members associated with this event.room_id,
-            # if any.
-            member_list = yield self.store.get_users_in_room(event.room_id)
-
         services = yield self.store.get_app_services()
         interested_list = [
             s for s in services if (
-                s.is_interested(event, restrict_to, alias_list, member_list)
+                yield s.is_interested(event, self.store)
             )
         ]
         defer.returnValue(interested_list)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index a582d6334b..6986930c0d 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -741,7 +741,7 @@ class AuthHandler(BaseHandler):
     def set_password(self, user_id, newpassword, requester=None):
         password_hash = self.hash(newpassword)
 
-        except_access_token_ids = [requester.access_token_id] if requester else []
+        except_access_token_id = requester.access_token_id if requester else None
 
         try:
             yield self.store.user_set_password_hash(user_id, password_hash)
@@ -750,10 +750,10 @@ class AuthHandler(BaseHandler):
                 raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
             raise e
         yield self.store.user_delete_access_tokens(
-            user_id, except_access_token_ids
+            user_id, except_access_token_id
         )
         yield self.hs.get_pusherpool().remove_pushers_by_user(
-            user_id, except_access_token_ids
+            user_id, except_access_token_id
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ff6bb475b5..328f8f4842 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -274,7 +274,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def backfill(self, dest, room_id, limit, extremities=[]):
+    def backfill(self, dest, room_id, limit, extremities):
         """ Trigger a backfill request to `dest` for the given `room_id`
 
         This will attempt to get more events from the remote. This may return
@@ -284,9 +284,6 @@ class FederationHandler(BaseHandler):
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
 
-        if not extremities:
-            extremities = yield self.store.get_oldest_events_in_room(room_id)
-
         events = yield self.replication_layer.backfill(
             dest,
             room_id,
@@ -455,6 +452,10 @@ class FederationHandler(BaseHandler):
         )
         max_depth = sorted_extremeties_tuple[0][1]
 
+        # We don't want to specify too many extremities as it causes the backfill
+        # request URI to be too long.
+        extremities = dict(sorted_extremeties_tuple[:5])
+
         if current_depth > max_depth:
             logger.debug(
                 "Not backfilling as we don't need to. %d < %d",
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2293b5fdf7..6a1fe76c88 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -503,7 +503,7 @@ class PresenceHandler(object):
         defer.returnValue(states)
 
     @defer.inlineCallbacks
-    def _get_interested_parties(self, states):
+    def _get_interested_parties(self, states, calculate_remote_hosts=True):
         """Given a list of states return which entities (rooms, users, servers)
         are interested in the given states.
 
@@ -526,14 +526,15 @@ class PresenceHandler(object):
             users_to_states.setdefault(state.user_id, []).append(state)
 
         hosts_to_states = {}
-        for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
+        if calculate_remote_hosts:
+            for room_id, states in room_ids_to_states.items():
+                local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+                if not local_states:
+                    continue
 
-            hosts = yield self.store.get_joined_hosts_for_room(room_id)
-            for host in hosts:
-                hosts_to_states.setdefault(host, []).extend(local_states)
+                hosts = yield self.store.get_joined_hosts_for_room(room_id)
+                for host in hosts:
+                    hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
             local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
@@ -565,6 +566,16 @@ class PresenceHandler(object):
 
         self._push_to_remotes(hosts_to_states)
 
+    @defer.inlineCallbacks
+    def notify_for_states(self, state, stream_id):
+        parties = yield self._get_interested_parties([state])
+        room_ids_to_states, users_to_states, hosts_to_states = parties
+
+        self.notifier.on_new_event(
+            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states.keys()]
+        )
+
     def _push_to_remotes(self, hosts_to_states):
         """Sends state updates to remote servers.