summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/handlers/message.py7
-rw-r--r--synapse/handlers/presence.py176
-rw-r--r--synapse/handlers/register.py17
-rw-r--r--synapse/handlers/room.py11
5 files changed, 177 insertions, 47 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9eaf2d3e18..0684778882 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.constants import (
-    KNOWN_ROOM_VERSIONS,
-    EventTypes,
-    Membership,
-    RejectedReason,
-    RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -44,6 +38,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.event_auth import auth_types_for_event
 from synapse.events.validator import EventValidator
@@ -1733,7 +1728,9 @@ class FederationHandler(BaseHandler):
             # invalid, and it would fail auth checks anyway.
             raise SynapseError(400, "No create event in state")
 
-        room_version = create_event.content.get("room_version", RoomVersions.V1)
+        room_version = create_event.content.get(
+            "room_version", RoomVersions.V1.identifier,
+        )
 
         missing_auth_events = set()
         for e in itertools.chain(auth_events, state, [event]):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9b41c7b205..8bc7a7678a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
-from synapse.api.constants import EventTypes, Membership, RoomVersions
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -30,6 +30,7 @@ from synapse.api.errors import (
     NotFoundError,
     SynapseError,
 )
+from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -603,7 +604,9 @@ class EventCreationHandler(object):
         """
 
         if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
-            room_version = event.content.get("room_version", RoomVersions.V1)
+            room_version = event.content.get(
+                "room_version", RoomVersions.V1.identifier
+            )
         else:
             room_version = yield self.store.get_room_version(event.room_id)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..e85c49742d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
         self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -132,9 +135,6 @@ class PresenceHandler(object):
             )
         )
 
-        distributor = hs.get_distributor()
-        distributor.observe("user_joined_room", self.user_joined_room)
-
         active_presence = self.store.take_presence_startup_info()
 
         # A dictionary of the current state of users. This is prefilled with
@@ -220,6 +220,15 @@ class PresenceHandler(object):
         LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
                    lambda: len(self.wheel_timer))
 
+        # Used to handle sending of presence to newly joined users/servers
+        if hs.config.use_presence:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+        # Presence is best effort and quickly heals itself, so lets just always
+        # stream from the current state when we restart.
+        self._event_pos = self.store.get_current_events_token()
+        self._event_processing = False
+
     @defer.inlineCallbacks
     def _on_shutdown(self):
         """Gets called when shutting down. This lets us persist any updates that
@@ -751,31 +760,6 @@ class PresenceHandler(object):
         yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called (via the distributor) when a user joins a room. This funciton
-        sends presence updates to servers, either:
-            1. the joining user is a local user and we send their presence to
-               all servers in the room.
-            2. the joining user is a remote user and so we send presence for all
-               local users in the room.
-        """
-        # We only need to send presence to servers that don't have it yet. We
-        # don't need to send to local clients here, as that is done as part
-        # of the event stream/sync.
-        # TODO: Only send to servers not already in the room.
-        if self.is_mine(user):
-            state = yield self.current_state_for_user(user.to_string())
-
-            self._push_to_remotes([state])
-        else:
-            user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
-
-            states = yield self.current_state_for_users(user_ids)
-
-            self._push_to_remotes(list(states.values()))
-
-    @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
@@ -945,6 +929,140 @@ class PresenceHandler(object):
         rows = yield self.store.get_all_presence_updates(last_id, current_id)
         defer.returnValue(rows)
 
+    def notify_new_event(self):
+        """Called when new events have happened. Handles users and servers
+        joining rooms and require being sent presence.
+        """
+
+        if self._event_processing:
+            return
+
+        @defer.inlineCallbacks
+        def _process_presence():
+            assert not self._event_processing
+
+            self._event_processing = True
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._event_processing = False
+
+        run_as_background_process("presence.notify_new_event", _process_presence)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "presence_delta"):
+                deltas = yield self.store.get_current_state_deltas(self._event_pos)
+                if not deltas:
+                    return
+
+                yield self._handle_state_delta(deltas)
+
+                self._event_pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("presence").set(
+                    self._event_pos
+                )
+
+    @defer.inlineCallbacks
+    def _handle_state_delta(self, deltas):
+        """Process current state deltas to find new joins that need to be
+        handled.
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            if typ != EventTypes.Member:
+                continue
+
+            event = yield self.store.get_event(event_id)
+            if event.content.get("membership") != Membership.JOIN:
+                # We only care about joins
+                continue
+
+            if prev_event_id:
+                prev_event = yield self.store.get_event(prev_event_id)
+                if prev_event.content.get("membership") == Membership.JOIN:
+                    # Ignore changes to join events.
+                    continue
+
+            yield self._on_user_joined_room(room_id, state_key)
+
+    @defer.inlineCallbacks
+    def _on_user_joined_room(self, room_id, user_id):
+        """Called when we detect a user joining the room via the current state
+        delta stream.
+
+        Args:
+            room_id (str)
+            user_id (str)
+
+        Returns:
+            Deferred
+        """
+
+        if self.is_mine_id(user_id):
+            # If this is a local user then we need to send their presence
+            # out to hosts in the room (who don't already have it)
+
+            # TODO: We should be able to filter the hosts down to those that
+            # haven't previously seen the user
+
+            state = yield self.current_state_for_user(user_id)
+            hosts = yield self.state.get_current_hosts_in_room(room_id)
+
+            # Filter out ourselves.
+            hosts = set(host for host in hosts if host != self.server_name)
+
+            self.federation.send_presence_to_destinations(
+                states=[state],
+                destinations=hosts,
+            )
+        else:
+            # A remote user has joined the room, so we need to:
+            #   1. Check if this is a new server in the room
+            #   2. If so send any presence they don't already have for
+            #      local users in the room.
+
+            # TODO: We should be able to filter the users down to those that
+            # the server hasn't previously seen
+
+            # TODO: Check that this is actually a new server joining the
+            # room.
+
+            user_ids = yield self.state.get_current_user_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, user_ids))
+
+            states = yield self.current_state_for_users(user_ids)
+
+            # Filter out old presence, i.e. offline presence states where
+            # the user hasn't been active for a week. We can change this
+            # depending on what we want the UX to be, but at the least we
+            # should filter out offline presence where the state is just the
+            # default state.
+            now = self.clock.time_msec()
+            states = [
+                state for state in states.values()
+                if state.state != PresenceState.OFFLINE
+                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+                or state.status_msg is not None
+            ]
+
+            if states:
+                self.federation.send_presence_to_destinations(
+                    states=states,
+                    destinations=[get_domain_from_id(user_id)],
+                )
+
 
 def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 58940e0320..a51d11a257 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -153,6 +153,7 @@ class RegistrationHandler(BaseHandler):
         user_type=None,
         default_display_name=None,
         address=None,
+        bind_emails=[],
     ):
         """Registers a new client on the server.
 
@@ -172,6 +173,7 @@ class RegistrationHandler(BaseHandler):
             default_display_name (unicode|None): if set, the new user's displayname
               will be set to this. Defaults to 'localpart'.
             address (str|None): the IP address used to perform the registration.
+            bind_emails (List[str]): list of emails to bind to this account.
         Returns:
             A tuple of (user_id, access_token).
         Raises:
@@ -261,6 +263,21 @@ class RegistrationHandler(BaseHandler):
         if not self.hs.config.user_consent_at_registration:
             yield self._auto_join_rooms(user_id)
 
+        # Bind any specified emails to this account
+        current_time = self.hs.get_clock().time_msec()
+        for email in bind_emails:
+            # generate threepid dict
+            threepid_dict = {
+                "medium": "email",
+                "address": email,
+                "validated_at": current_time,
+            }
+
+            # Bind email to new account
+            yield self._register_email_threepid(
+                user_id, threepid_dict, None, False,
+            )
+
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 67b15697fd..c3dcfec247 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,14 +25,9 @@ from six import iteritems, string_types
 
 from twisted.internet import defer
 
-from synapse.api.constants import (
-    DEFAULT_ROOM_VERSION,
-    KNOWN_ROOM_VERSIONS,
-    EventTypes,
-    JoinRules,
-    RoomCreationPreset,
-)
+from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -479,7 +474,7 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,