diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 53f9296399..2d9126dd86 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import (
- PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+ preserve_fn, preserve_context_over_deferred
)
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
@@ -394,11 +394,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -916,11 +915,10 @@ class FederationHandler(BaseHandler):
origin, auth_chain, state, event
)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[joinee]
+ )
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
@@ -1035,10 +1033,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
@@ -1084,11 +1081,10 @@ class FederationHandler(BaseHandler):
)
target_user = UserID.from_string(event.state_key)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[target_user],
+ )
defer.returnValue(event)
@@ -1246,10 +1242,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
defer.returnValue(None)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7a498af5a2..348056add5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -612,7 +612,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
- yield self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1ede117c79..f3707afcd0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.async import Linearizer
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -187,6 +188,7 @@ class PresenceHandler(object):
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
+ self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
@@ -316,11 +318,7 @@ class PresenceHandler(object):
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
- _, _, hosts_to_states = yield self._get_interested_parties(
- to_federation_ping.values()
- )
-
- self._push_to_remotes(hosts_to_states)
+ self._push_to_remotes(to_federation_ping.values())
def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@@ -509,6 +507,73 @@ class PresenceHandler(object):
self.external_process_to_current_syncs[process_id] = syncing_user_ids
@defer.inlineCallbacks
+ def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+ """Update the syncing users for an external process as a delta.
+
+ Args:
+ process_id (str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ user_id (str): The user who has started or stopped syncing
+ is_syncing (bool): Whether or not the user is now syncing
+ sync_time_msec(int): Time in ms when the user was last syncing
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ prev_state = yield self.current_state_for_user(user_id)
+
+ process_presence = self.external_process_to_current_syncs.setdefault(
+ process_id, set()
+ )
+
+ updates = []
+ if is_syncing and user_id not in process_presence:
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=sync_time_msec,
+ last_user_sync_ts=sync_time_msec,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+ process_presence.add(user_id)
+ elif user_id in process_presence:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+
+ if not is_syncing:
+ process_presence.discard(user_id)
+
+ if updates:
+ yield self._update_states(updates)
+
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+
+ @defer.inlineCallbacks
+ def update_external_syncs_clear(self, process_id):
+ """Marks all users that had been marked as syncing by a given process
+ as offline.
+
+ Used when the process has stopped/disappeared.
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ process_presence = self.external_process_to_current_syncs.pop(
+ process_id, set()
+ )
+ prev_states = yield self.current_state_for_users(process_presence)
+ time_now_ms = self.clock.time_msec()
+
+ yield self._update_states([
+ prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ )
+ for prev_state in prev_states.itervalues()
+ ])
+ self.external_process_last_updated_ms.pop(process_id, None)
+
+ @defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
"""
@@ -527,14 +592,14 @@ class PresenceHandler(object):
for user_id in user_ids
}
- missing = [user_id for user_id, state in states.items() if not state]
+ missing = [user_id for user_id, state in states.iteritems() if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in states.items() if not state]
+ missing = [user_id for user_id, state in states.iteritems() if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id)
@@ -546,88 +611,39 @@ class PresenceHandler(object):
defer.returnValue(states)
@defer.inlineCallbacks
- def _get_interested_parties(self, states, calculate_remote_hosts=True):
- """Given a list of states return which entities (rooms, users, servers)
- are interested in the given states.
-
- Returns:
- 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
- with each item being a dict of `entity_name` -> `[UserPresenceState]`
- """
- room_ids_to_states = {}
- users_to_states = {}
- for state in states:
- room_ids = yield self.store.get_rooms_for_user(state.user_id)
- for room_id in room_ids:
- room_ids_to_states.setdefault(room_id, []).append(state)
-
- plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
- for u in plist:
- users_to_states.setdefault(u, []).append(state)
-
- # Always notify self
- users_to_states.setdefault(state.user_id, []).append(state)
-
- hosts_to_states = {}
- if calculate_remote_hosts:
- for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- hosts = yield self.store.get_hosts_in_room(room_id)
-
- for host in hosts:
- hosts_to_states.setdefault(host, []).extend(local_states)
-
- for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- host = get_domain_from_id(user_id)
- hosts_to_states.setdefault(host, []).extend(local_states)
-
- # TODO: de-dup hosts_to_states, as a single host might have multiple
- # of same presence
-
- defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
-
- @defer.inlineCallbacks
def _persist_and_notify(self, states):
"""Persist states in the database, poke the notifier and send to
interested remote servers
"""
stream_id, max_token = yield self.store.update_presence(states)
- parties = yield self._get_interested_parties(states)
- room_ids_to_states, users_to_states, hosts_to_states = parties
+ parties = yield get_interested_parties(self.store, states)
+ room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states.keys()]
+ users=[UserID.from_string(u) for u in users_to_states]
)
- self._push_to_remotes(hosts_to_states)
+ self._push_to_remotes(states)
@defer.inlineCallbacks
def notify_for_states(self, state, stream_id):
- parties = yield self._get_interested_parties([state])
- room_ids_to_states, users_to_states, hosts_to_states = parties
+ parties = yield get_interested_parties(self.store, [state])
+ room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states.keys()]
+ users=[UserID.from_string(u) for u in users_to_states]
)
- def _push_to_remotes(self, hosts_to_states):
+ def _push_to_remotes(self, states):
"""Sends state updates to remote servers.
Args:
- hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+ states (list(UserPresenceState))
"""
- for host, states in hosts_to_states.items():
- self.federation.send_presence(host, states)
+ self.federation.send_presence(states)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@@ -768,14 +784,13 @@ class PresenceHandler(object):
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
- hosts = set(get_domain_from_id(u) for u in user_ids)
- self._push_to_remotes({host: (state,) for host in hosts})
+ self._push_to_remotes([state])
else:
user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
- self._push_to_remotes({user.domain: states.values()})
+ self._push_to_remotes(states.values())
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -1275,3 +1290,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
persist_and_notify = True
return new_state, persist_and_notify, federation_ping
+
+
+@defer.inlineCallbacks
+def get_interested_parties(store, states):
+ """Given a list of states return which entities (rooms, users)
+ are interested in the given states.
+
+ Args:
+ states (list(UserPresenceState))
+
+ Returns:
+ 2-tuple: `(room_ids_to_states, users_to_states)`,
+ with each item being a dict of `entity_name` -> `[UserPresenceState]`
+ """
+ room_ids_to_states = {}
+ users_to_states = {}
+ for state in states:
+ room_ids = yield store.get_rooms_for_user(state.user_id)
+ for room_id in room_ids:
+ room_ids_to_states.setdefault(room_id, []).append(state)
+
+ plist = yield store.get_presence_list_observers_accepted(state.user_id)
+ for u in plist:
+ users_to_states.setdefault(u, []).append(state)
+
+ # Always notify self
+ users_to_states.setdefault(state.user_id, []).append(state)
+
+ defer.returnValue((room_ids_to_states, users_to_states))
+
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states):
+ """Given a list of presence states figure out which remote servers
+ should be sent which.
+
+ All the presence states should be for local users only.
+
+ Args:
+ store (DataStore)
+ states (list(UserPresenceState))
+
+ Returns:
+ Deferred list of ([destinations], [UserPresenceState]), where for
+ each row the list of UserPresenceState should be sent to each
+ destination
+ """
+ hosts_and_states = []
+
+ # First we look up the rooms each user is in (as well as any explicit
+ # subscriptions), then for each distinct room we look up the remote
+ # hosts in those rooms.
+ room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
+
+ for room_id, states in room_ids_to_states.iteritems():
+ hosts = yield store.get_hosts_in_room(room_id)
+ hosts_and_states.append((hosts, states))
+
+ for user_id, states in users_to_states.iteritems():
+ host = get_domain_from_id(user_id)
+ hosts_and_states.append(([host], states))
+
+ defer.returnValue(hosts_and_states)
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
new file mode 100644
index 0000000000..3f46a16b90
--- /dev/null
+++ b/synapse/handlers/read_marker.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from twisted.internet import defer
+
+from synapse.util.async import Linearizer
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class ReadMarkerHandler(BaseHandler):
+ def __init__(self, hs):
+ super(ReadMarkerHandler, self).__init__(hs)
+ self.server_name = hs.config.server_name
+ self.store = hs.get_datastore()
+ self.read_marker_linearizer = Linearizer(name="read_marker")
+ self.notifier = hs.get_notifier()
+
+ @defer.inlineCallbacks
+ def received_client_read_marker(self, room_id, user_id, event_id):
+ """Updates the read marker for a given user in a given room if the event ID given
+ is ahead in the stream relative to the current read marker.
+
+ This uses a notifier to indicate that account data should be sent down /sync if
+ the read marker has changed.
+ """
+
+ with (yield self.read_marker_linearizer.queue((room_id, user_id))):
+ account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+
+ existing_read_marker = account_data.get("m.read_marker", None)
+
+ should_update = True
+
+ if existing_read_marker:
+ # Only update if the new marker is ahead in the stream
+ should_update = yield self.store.is_event_after(
+ event_id,
+ existing_read_marker['marker']
+ )
+
+ if should_update:
+ content = {
+ "marker": event_id
+ }
+ max_id = yield self.store.add_account_data_to_room(
+ user_id, room_id, "m.read_marker", content
+ )
+ self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0eea7f8f9c..3b7818af5c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id
import logging
from collections import namedtuple
-import ujson as json
logger = logging.getLogger(__name__)
@@ -288,11 +287,13 @@ class TypingHandler(object):
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
- typing_bytes = json.dumps(list(typing), ensure_ascii=False)
- rows.append((serial, room_id, typing_bytes))
+ rows.append((serial, room_id, list(typing)))
rows.sort()
return rows
+ def get_current_token(self):
+ return self._latest_room_serial
+
class TypingNotificationEventSource(object):
def __init__(self, hs):
|