diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5a9e7720d9..8c3cf9e801 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,7 +20,7 @@ from .appservice import (
from ._base import Cache
from .directory import DirectoryStore
from .events import EventsStore
-from .presence import PresenceStore
+from .presence import PresenceStore, UserPresenceState
from .profile import ProfileStore
from .registration import RegistrationStore
from .room import RoomStore
@@ -47,6 +47,7 @@ from .account_data import AccountDataStore
from util.id_generators import IdGenerator, StreamIdGenerator
+from synapse.api.constants import PresenceState
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -110,6 +111,9 @@ class DataStore(RoomMemberStore, RoomStore,
self._account_data_id_gen = StreamIdGenerator(
db_conn, "account_data_max_stream_id", "stream_id"
)
+ self._presence_id_gen = StreamIdGenerator(
+ db_conn, "presence_stream", "stream_id"
+ )
self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
@@ -119,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
- events_max = self._stream_id_gen.get_max_token(None)
+ events_max = self._stream_id_gen.get_max_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
entity_column="room_id",
@@ -135,13 +139,31 @@ class DataStore(RoomMemberStore, RoomStore,
"MembershipStreamChangeCache", events_max,
)
- account_max = self._account_data_id_gen.get_max_token(None)
+ account_max = self._account_data_id_gen.get_max_token()
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max,
)
+ self.__presence_on_startup = self._get_active_presence(db_conn)
+
+ presence_cache_prefill, min_presence_val = self._get_cache_dict(
+ db_conn, "presence_stream",
+ entity_column="user_id",
+ stream_column="stream_id",
+ max_value=self._presence_id_gen.get_max_token(),
+ )
+ self.presence_stream_cache = StreamChangeCache(
+ "PresenceStreamChangeCache", min_presence_val,
+ prefilled_cache=presence_cache_prefill
+ )
+
super(DataStore, self).__init__(hs)
+ def take_presence_startup_info(self):
+ active_on_startup = self.__presence_on_startup
+ self.__presence_on_startup = None
+ return active_on_startup
+
def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
# It doesn't really matter how many we get, the StreamChangeCache will
@@ -161,6 +183,7 @@ class DataStore(RoomMemberStore, RoomStore,
txn = db_conn.cursor()
txn.execute(sql, (int(max_value),))
rows = txn.fetchall()
+ txn.close()
cache = {
row[0]: int(row[1])
@@ -174,6 +197,27 @@ class DataStore(RoomMemberStore, RoomStore,
return cache, min_val
+ def _get_active_presence(self, db_conn):
+ """Fetch non-offline presence from the database so that we can register
+ the appropriate time outs.
+ """
+
+ sql = (
+ "SELECT user_id, state, last_active, last_federation_update,"
+ " last_user_sync, status_msg, currently_active FROM presence_stream"
+ " WHERE state != ?"
+ )
+ sql = self.database_engine.convert_param_style(sql)
+
+ txn = db_conn.cursor()
+ txn.execute(sql, (PresenceState.OFFLINE,))
+ rows = self.cursor_to_dict(txn)
+
+ for row in rows:
+ row["currently_active"] = bool(row["currently_active"])
+
+ return [UserPresenceState(**row) for row in rows]
+
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 850736c85e..0fd5d497ab 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 29
+SCHEMA_VERSION = 30
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index ef525f34c5..b133979102 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -14,73 +14,128 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.api.constants import PresenceState
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from collections import namedtuple
from twisted.internet import defer
-class PresenceStore(SQLBaseStore):
- def create_presence(self, user_localpart):
- res = self._simple_insert(
- table="presence",
- values={"user_id": user_localpart},
- desc="create_presence",
+class UserPresenceState(namedtuple("UserPresenceState",
+ ("user_id", "state", "last_active", "last_federation_update",
+ "last_user_sync", "status_msg", "currently_active"))):
+ """Represents the current presence state of the user.
+
+ user_id (str)
+ last_active (int): Time in msec that the user last interacted with server.
+ last_federation_update (int): Time in msec since either a) we sent a presence
+ update to other servers or b) we received a presence update, depending
+ on if is a local user or not.
+ last_user_sync (int): Time in msec that the user last *completed* a sync
+ (or event stream).
+ status_msg (str): User set status message.
+ """
+
+ def copy_and_replace(self, **kwargs):
+ return self._replace(**kwargs)
+
+ @classmethod
+ def default(cls, user_id):
+ """Returns a default presence state.
+ """
+ return cls(
+ user_id=user_id,
+ state=PresenceState.OFFLINE,
+ last_active=0,
+ last_federation_update=0,
+ last_user_sync=0,
+ status_msg=None,
+ currently_active=False,
)
- self.get_presence_state.invalidate((user_localpart,))
- return res
- def has_presence_state(self, user_localpart):
- return self._simple_select_one(
- table="presence",
- keyvalues={"user_id": user_localpart},
- retcols=["user_id"],
- allow_none=True,
- desc="has_presence_state",
+class PresenceStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def update_presence(self, presence_states):
+ stream_id_manager = yield self._presence_id_gen.get_next(self)
+ with stream_id_manager as stream_id:
+ yield self.runInteraction(
+ "update_presence",
+ self._update_presence_txn, stream_id, presence_states,
+ )
+
+ defer.returnValue((stream_id, self._presence_id_gen.get_max_token()))
+
+ def _update_presence_txn(self, txn, stream_id, presence_states):
+ for state in presence_states:
+ txn.call_after(
+ self.presence_stream_cache.entity_has_changed,
+ state.user_id, stream_id,
+ )
+
+ # Actually insert new rows
+ self._simple_insert_many_txn(
+ txn,
+ table="presence_stream",
+ values=[
+ {
+ "stream_id": stream_id,
+ "user_id": state.user_id,
+ "state": state.state,
+ "last_active": state.last_active,
+ "last_federation_update": state.last_federation_update,
+ "last_user_sync": state.last_user_sync,
+ "status_msg": state.status_msg,
+ "currently_active": state.currently_active,
+ }
+ for state in presence_states
+ ],
)
- @cached(max_entries=2000)
- def get_presence_state(self, user_localpart):
- return self._simple_select_one(
- table="presence",
- keyvalues={"user_id": user_localpart},
- retcols=["state", "status_msg", "mtime"],
- desc="get_presence_state",
+ # Delete old rows to stop database from getting really big
+ sql = (
+ "DELETE FROM presence_stream WHERE"
+ " stream_id < ?"
+ " AND user_id IN (%s)"
)
- @cachedList(get_presence_state.cache, list_name="user_localparts",
- inlineCallbacks=True)
- def get_presence_states(self, user_localparts):
+ batches = (
+ presence_states[i:i + 50]
+ for i in xrange(0, len(presence_states), 50)
+ )
+ for states in batches:
+ args = [stream_id]
+ args.extend(s.user_id for s in states)
+ txn.execute(
+ sql % (",".join("?" for _ in states),),
+ args
+ )
+
+ @defer.inlineCallbacks
+ def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch(
- table="presence",
+ table="presence_stream",
column="user_id",
- iterable=user_localparts,
- retcols=("user_id", "state", "status_msg", "mtime",),
- desc="get_presence_states",
+ iterable=user_ids,
+ keyvalues={},
+ retcols=(
+ "user_id",
+ "state",
+ "last_active",
+ "last_federation_update",
+ "last_user_sync",
+ "status_msg",
+ "currently_active",
+ ),
)
- defer.returnValue({
- row["user_id"]: {
- "state": row["state"],
- "status_msg": row["status_msg"],
- "mtime": row["mtime"],
- }
- for row in rows
- })
+ for row in rows:
+ row["currently_active"] = bool(row["currently_active"])
- @defer.inlineCallbacks
- def set_presence_state(self, user_localpart, new_state):
- res = yield self._simple_update_one(
- table="presence",
- keyvalues={"user_id": user_localpart},
- updatevalues={"state": new_state["state"],
- "status_msg": new_state["status_msg"],
- "mtime": self._clock.time_msec()},
- desc="set_presence_state",
- )
+ defer.returnValue([UserPresenceState(**row) for row in rows])
- self.get_presence_state.invalidate((user_localpart,))
- defer.returnValue(res)
+ def get_current_presence_token(self):
+ return self._presence_id_gen.get_max_token()
def allow_presence_visible(self, observed_localpart, observer_userid):
return self._simple_insert(
@@ -128,6 +183,7 @@ class PresenceStore(SQLBaseStore):
desc="set_presence_list_accepted",
)
self.get_presence_list_accepted.invalidate((observer_localpart,))
+ self.get_presence_list_observers_accepted.invalidate((observed_userid,))
defer.returnValue(result)
def get_presence_list(self, observer_localpart, accepted=None):
@@ -154,6 +210,19 @@ class PresenceStore(SQLBaseStore):
desc="get_presence_list_accepted",
)
+ @cachedInlineCallbacks()
+ def get_presence_list_observers_accepted(self, observed_userid):
+ user_localparts = yield self._simple_select_onecol(
+ table="presence_list",
+ keyvalues={"observed_user_id": observed_userid, "accepted": True},
+ retcol="user_id",
+ desc="get_presence_list_accepted",
+ )
+
+ defer.returnValue([
+ "@%s:%s" % (u, self.hs.hostname,) for u in user_localparts
+ ])
+
@defer.inlineCallbacks
def del_presence_list(self, observer_localpart, observed_userid):
yield self._simple_delete_one(
@@ -163,3 +232,4 @@ class PresenceStore(SQLBaseStore):
desc="del_presence_list",
)
self.get_presence_list_accepted.invalidate((observer_localpart,))
+ self.get_presence_list_observers_accepted.invalidate((observed_userid,))
diff --git a/synapse/storage/schema/delta/30/presence_stream.sql b/synapse/storage/schema/delta/30/presence_stream.sql
new file mode 100644
index 0000000000..14f5e3d30a
--- /dev/null
+++ b/synapse/storage/schema/delta/30/presence_stream.sql
@@ -0,0 +1,30 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+ CREATE TABLE presence_stream(
+ stream_id BIGINT,
+ user_id TEXT,
+ state TEXT,
+ last_active BIGINT,
+ last_federation_update BIGINT,
+ last_user_sync BIGINT,
+ status_msg TEXT,
+ currently_active BOOLEAN
+ );
+
+ CREATE INDEX presence_stream_id ON presence_stream(stream_id, user_id);
+ CREATE INDEX presence_stream_user_id ON presence_stream(user_id);
+ CREATE INDEX presence_stream_state ON presence_stream(state);
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 5c522f4ab9..5ce54f76de 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -130,9 +130,11 @@ class StreamIdGenerator(object):
return manager()
- def get_max_token(self, store):
+ def get_max_token(self, *args):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
+
+ Used to take a DataStore param, which is no longer needed.
"""
with self._lock:
if self._unfinished_ids:
|