diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index ef525f34c5..b133979102 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -14,73 +14,128 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.api.constants import PresenceState
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from collections import namedtuple
from twisted.internet import defer
-class PresenceStore(SQLBaseStore):
- def create_presence(self, user_localpart):
- res = self._simple_insert(
- table="presence",
- values={"user_id": user_localpart},
- desc="create_presence",
+class UserPresenceState(namedtuple("UserPresenceState",
+ ("user_id", "state", "last_active", "last_federation_update",
+ "last_user_sync", "status_msg", "currently_active"))):
+ """Represents the current presence state of the user.
+
+ user_id (str)
+ last_active (int): Time in msec that the user last interacted with server.
+ last_federation_update (int): Time in msec since either a) we sent a presence
+ update to other servers or b) we received a presence update, depending
+ on if is a local user or not.
+ last_user_sync (int): Time in msec that the user last *completed* a sync
+ (or event stream).
+ status_msg (str): User set status message.
+ """
+
+ def copy_and_replace(self, **kwargs):
+ return self._replace(**kwargs)
+
+ @classmethod
+ def default(cls, user_id):
+ """Returns a default presence state.
+ """
+ return cls(
+ user_id=user_id,
+ state=PresenceState.OFFLINE,
+ last_active=0,
+ last_federation_update=0,
+ last_user_sync=0,
+ status_msg=None,
+ currently_active=False,
)
- self.get_presence_state.invalidate((user_localpart,))
- return res
- def has_presence_state(self, user_localpart):
- return self._simple_select_one(
- table="presence",
- keyvalues={"user_id": user_localpart},
- retcols=["user_id"],
- allow_none=True,
- desc="has_presence_state",
+class PresenceStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def update_presence(self, presence_states):
+ stream_id_manager = yield self._presence_id_gen.get_next(self)
+ with stream_id_manager as stream_id:
+ yield self.runInteraction(
+ "update_presence",
+ self._update_presence_txn, stream_id, presence_states,
+ )
+
+ defer.returnValue((stream_id, self._presence_id_gen.get_max_token()))
+
+ def _update_presence_txn(self, txn, stream_id, presence_states):
+ for state in presence_states:
+ txn.call_after(
+ self.presence_stream_cache.entity_has_changed,
+ state.user_id, stream_id,
+ )
+
+ # Actually insert new rows
+ self._simple_insert_many_txn(
+ txn,
+ table="presence_stream",
+ values=[
+ {
+ "stream_id": stream_id,
+ "user_id": state.user_id,
+ "state": state.state,
+ "last_active": state.last_active,
+ "last_federation_update": state.last_federation_update,
+ "last_user_sync": state.last_user_sync,
+ "status_msg": state.status_msg,
+ "currently_active": state.currently_active,
+ }
+ for state in presence_states
+ ],
)
- @cached(max_entries=2000)
- def get_presence_state(self, user_localpart):
- return self._simple_select_one(
- table="presence",
- keyvalues={"user_id": user_localpart},
- retcols=["state", "status_msg", "mtime"],
- desc="get_presence_state",
+ # Delete old rows to stop database from getting really big
+ sql = (
+ "DELETE FROM presence_stream WHERE"
+ " stream_id < ?"
+ " AND user_id IN (%s)"
)
- @cachedList(get_presence_state.cache, list_name="user_localparts",
- inlineCallbacks=True)
- def get_presence_states(self, user_localparts):
+ batches = (
+ presence_states[i:i + 50]
+ for i in xrange(0, len(presence_states), 50)
+ )
+ for states in batches:
+ args = [stream_id]
+ args.extend(s.user_id for s in states)
+ txn.execute(
+ sql % (",".join("?" for _ in states),),
+ args
+ )
+
+ @defer.inlineCallbacks
+ def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch(
- table="presence",
+ table="presence_stream",
column="user_id",
- iterable=user_localparts,
- retcols=("user_id", "state", "status_msg", "mtime",),
- desc="get_presence_states",
+ iterable=user_ids,
+ keyvalues={},
+ retcols=(
+ "user_id",
+ "state",
+ "last_active",
+ "last_federation_update",
+ "last_user_sync",
+ "status_msg",
+ "currently_active",
+ ),
)
- defer.returnValue({
- row["user_id"]: {
- "state": row["state"],
- "status_msg": row["status_msg"],
- "mtime": row["mtime"],
- }
- for row in rows
- })
+ for row in rows:
+ row["currently_active"] = bool(row["currently_active"])
- @defer.inlineCallbacks
- def set_presence_state(self, user_localpart, new_state):
- res = yield self._simple_update_one(
- table="presence",
- keyvalues={"user_id": user_localpart},
- updatevalues={"state": new_state["state"],
- "status_msg": new_state["status_msg"],
- "mtime": self._clock.time_msec()},
- desc="set_presence_state",
- )
+ defer.returnValue([UserPresenceState(**row) for row in rows])
- self.get_presence_state.invalidate((user_localpart,))
- defer.returnValue(res)
+ def get_current_presence_token(self):
+ return self._presence_id_gen.get_max_token()
def allow_presence_visible(self, observed_localpart, observer_userid):
return self._simple_insert(
@@ -128,6 +183,7 @@ class PresenceStore(SQLBaseStore):
desc="set_presence_list_accepted",
)
self.get_presence_list_accepted.invalidate((observer_localpart,))
+ self.get_presence_list_observers_accepted.invalidate((observed_userid,))
defer.returnValue(result)
def get_presence_list(self, observer_localpart, accepted=None):
@@ -154,6 +210,19 @@ class PresenceStore(SQLBaseStore):
desc="get_presence_list_accepted",
)
+ @cachedInlineCallbacks()
+ def get_presence_list_observers_accepted(self, observed_userid):
+ user_localparts = yield self._simple_select_onecol(
+ table="presence_list",
+ keyvalues={"observed_user_id": observed_userid, "accepted": True},
+ retcol="user_id",
+ desc="get_presence_list_accepted",
+ )
+
+ defer.returnValue([
+ "@%s:%s" % (u, self.hs.hostname,) for u in user_localparts
+ ])
+
@defer.inlineCallbacks
def del_presence_list(self, observer_localpart, observed_userid):
yield self._simple_delete_one(
@@ -163,3 +232,4 @@ class PresenceStore(SQLBaseStore):
desc="del_presence_list",
)
self.get_presence_list_accepted.invalidate((observer_localpart,))
+ self.get_presence_list_observers_accepted.invalidate((observed_userid,))
|