From e5999bfb1a4aab56acecb59ed6d068442f5b11a0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Feb 2016 17:10:40 +0000 Subject: Initial cut --- synapse/storage/__init__.py | 50 +++++- synapse/storage/prepare_database.py | 2 +- synapse/storage/presence.py | 170 +++++++++++++++------ .../storage/schema/delta/30/presence_stream.sql | 30 ++++ synapse/storage/util/id_generators.py | 4 +- 5 files changed, 201 insertions(+), 55 deletions(-) create mode 100644 synapse/storage/schema/delta/30/presence_stream.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5a9e7720d9..8c3cf9e801 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,7 +20,7 @@ from .appservice import ( from ._base import Cache from .directory import DirectoryStore from .events import EventsStore -from .presence import PresenceStore +from .presence import PresenceStore, UserPresenceState from .profile import ProfileStore from .registration import RegistrationStore from .room import RoomStore @@ -47,6 +47,7 @@ from .account_data import AccountDataStore from util.id_generators import IdGenerator, StreamIdGenerator +from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -110,6 +111,9 @@ class DataStore(RoomMemberStore, RoomStore, self._account_data_id_gen = StreamIdGenerator( db_conn, "account_data_max_stream_id", "stream_id" ) + self._presence_id_gen = StreamIdGenerator( + db_conn, "presence_stream", "stream_id" + ) self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) self._state_groups_id_gen = IdGenerator("state_groups", "id", self) @@ -119,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore, self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) - events_max = self._stream_id_gen.get_max_token(None) + events_max = self._stream_id_gen.get_max_token() event_cache_prefill, min_event_val = self._get_cache_dict( db_conn, "events", entity_column="room_id", @@ -135,13 +139,31 @@ class DataStore(RoomMemberStore, RoomStore, "MembershipStreamChangeCache", events_max, ) - account_max = self._account_data_id_gen.get_max_token(None) + account_max = self._account_data_id_gen.get_max_token() self._account_data_stream_cache = StreamChangeCache( "AccountDataAndTagsChangeCache", account_max, ) + self.__presence_on_startup = self._get_active_presence(db_conn) + + presence_cache_prefill, min_presence_val = self._get_cache_dict( + db_conn, "presence_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=self._presence_id_gen.get_max_token(), + ) + self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", min_presence_val, + prefilled_cache=presence_cache_prefill + ) + super(DataStore, self).__init__(hs) + def take_presence_startup_info(self): + active_on_startup = self.__presence_on_startup + self.__presence_on_startup = None + return active_on_startup + def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): # Fetch a mapping of room_id -> max stream position for "recent" rooms. # It doesn't really matter how many we get, the StreamChangeCache will @@ -161,6 +183,7 @@ class DataStore(RoomMemberStore, RoomStore, txn = db_conn.cursor() txn.execute(sql, (int(max_value),)) rows = txn.fetchall() + txn.close() cache = { row[0]: int(row[1]) @@ -174,6 +197,27 @@ class DataStore(RoomMemberStore, RoomStore, return cache, min_val + def _get_active_presence(self, db_conn): + """Fetch non-offline presence from the database so that we can register + the appropriate time outs. + """ + + sql = ( + "SELECT user_id, state, last_active, last_federation_update," + " last_user_sync, status_msg, currently_active FROM presence_stream" + " WHERE state != ?" + ) + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, (PresenceState.OFFLINE,)) + rows = self.cursor_to_dict(txn) + + for row in rows: + row["currently_active"] = bool(row["currently_active"]) + + return [UserPresenceState(**row) for row in rows] + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 850736c85e..0fd5d497ab 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 29 +SCHEMA_VERSION = 30 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index ef525f34c5..b133979102 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -14,73 +14,128 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList +from synapse.api.constants import PresenceState +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from collections import namedtuple from twisted.internet import defer -class PresenceStore(SQLBaseStore): - def create_presence(self, user_localpart): - res = self._simple_insert( - table="presence", - values={"user_id": user_localpart}, - desc="create_presence", +class UserPresenceState(namedtuple("UserPresenceState", + ("user_id", "state", "last_active", "last_federation_update", + "last_user_sync", "status_msg", "currently_active"))): + """Represents the current presence state of the user. + + user_id (str) + last_active (int): Time in msec that the user last interacted with server. + last_federation_update (int): Time in msec since either a) we sent a presence + update to other servers or b) we received a presence update, depending + on if is a local user or not. + last_user_sync (int): Time in msec that the user last *completed* a sync + (or event stream). + status_msg (str): User set status message. + """ + + def copy_and_replace(self, **kwargs): + return self._replace(**kwargs) + + @classmethod + def default(cls, user_id): + """Returns a default presence state. + """ + return cls( + user_id=user_id, + state=PresenceState.OFFLINE, + last_active=0, + last_federation_update=0, + last_user_sync=0, + status_msg=None, + currently_active=False, ) - self.get_presence_state.invalidate((user_localpart,)) - return res - def has_presence_state(self, user_localpart): - return self._simple_select_one( - table="presence", - keyvalues={"user_id": user_localpart}, - retcols=["user_id"], - allow_none=True, - desc="has_presence_state", +class PresenceStore(SQLBaseStore): + @defer.inlineCallbacks + def update_presence(self, presence_states): + stream_id_manager = yield self._presence_id_gen.get_next(self) + with stream_id_manager as stream_id: + yield self.runInteraction( + "update_presence", + self._update_presence_txn, stream_id, presence_states, + ) + + defer.returnValue((stream_id, self._presence_id_gen.get_max_token())) + + def _update_presence_txn(self, txn, stream_id, presence_states): + for state in presence_states: + txn.call_after( + self.presence_stream_cache.entity_has_changed, + state.user_id, stream_id, + ) + + # Actually insert new rows + self._simple_insert_many_txn( + txn, + table="presence_stream", + values=[ + { + "stream_id": stream_id, + "user_id": state.user_id, + "state": state.state, + "last_active": state.last_active, + "last_federation_update": state.last_federation_update, + "last_user_sync": state.last_user_sync, + "status_msg": state.status_msg, + "currently_active": state.currently_active, + } + for state in presence_states + ], ) - @cached(max_entries=2000) - def get_presence_state(self, user_localpart): - return self._simple_select_one( - table="presence", - keyvalues={"user_id": user_localpart}, - retcols=["state", "status_msg", "mtime"], - desc="get_presence_state", + # Delete old rows to stop database from getting really big + sql = ( + "DELETE FROM presence_stream WHERE" + " stream_id < ?" + " AND user_id IN (%s)" ) - @cachedList(get_presence_state.cache, list_name="user_localparts", - inlineCallbacks=True) - def get_presence_states(self, user_localparts): + batches = ( + presence_states[i:i + 50] + for i in xrange(0, len(presence_states), 50) + ) + for states in batches: + args = [stream_id] + args.extend(s.user_id for s in states) + txn.execute( + sql % (",".join("?" for _ in states),), + args + ) + + @defer.inlineCallbacks + def get_presence_for_users(self, user_ids): rows = yield self._simple_select_many_batch( - table="presence", + table="presence_stream", column="user_id", - iterable=user_localparts, - retcols=("user_id", "state", "status_msg", "mtime",), - desc="get_presence_states", + iterable=user_ids, + keyvalues={}, + retcols=( + "user_id", + "state", + "last_active", + "last_federation_update", + "last_user_sync", + "status_msg", + "currently_active", + ), ) - defer.returnValue({ - row["user_id"]: { - "state": row["state"], - "status_msg": row["status_msg"], - "mtime": row["mtime"], - } - for row in rows - }) + for row in rows: + row["currently_active"] = bool(row["currently_active"]) - @defer.inlineCallbacks - def set_presence_state(self, user_localpart, new_state): - res = yield self._simple_update_one( - table="presence", - keyvalues={"user_id": user_localpart}, - updatevalues={"state": new_state["state"], - "status_msg": new_state["status_msg"], - "mtime": self._clock.time_msec()}, - desc="set_presence_state", - ) + defer.returnValue([UserPresenceState(**row) for row in rows]) - self.get_presence_state.invalidate((user_localpart,)) - defer.returnValue(res) + def get_current_presence_token(self): + return self._presence_id_gen.get_max_token() def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( @@ -128,6 +183,7 @@ class PresenceStore(SQLBaseStore): desc="set_presence_list_accepted", ) self.get_presence_list_accepted.invalidate((observer_localpart,)) + self.get_presence_list_observers_accepted.invalidate((observed_userid,)) defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): @@ -154,6 +210,19 @@ class PresenceStore(SQLBaseStore): desc="get_presence_list_accepted", ) + @cachedInlineCallbacks() + def get_presence_list_observers_accepted(self, observed_userid): + user_localparts = yield self._simple_select_onecol( + table="presence_list", + keyvalues={"observed_user_id": observed_userid, "accepted": True}, + retcol="user_id", + desc="get_presence_list_accepted", + ) + + defer.returnValue([ + "@%s:%s" % (u, self.hs.hostname,) for u in user_localparts + ]) + @defer.inlineCallbacks def del_presence_list(self, observer_localpart, observed_userid): yield self._simple_delete_one( @@ -163,3 +232,4 @@ class PresenceStore(SQLBaseStore): desc="del_presence_list", ) self.get_presence_list_accepted.invalidate((observer_localpart,)) + self.get_presence_list_observers_accepted.invalidate((observed_userid,)) diff --git a/synapse/storage/schema/delta/30/presence_stream.sql b/synapse/storage/schema/delta/30/presence_stream.sql new file mode 100644 index 0000000000..14f5e3d30a --- /dev/null +++ b/synapse/storage/schema/delta/30/presence_stream.sql @@ -0,0 +1,30 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + CREATE TABLE presence_stream( + stream_id BIGINT, + user_id TEXT, + state TEXT, + last_active BIGINT, + last_federation_update BIGINT, + last_user_sync BIGINT, + status_msg TEXT, + currently_active BOOLEAN + ); + + CREATE INDEX presence_stream_id ON presence_stream(stream_id, user_id); + CREATE INDEX presence_stream_user_id ON presence_stream(user_id); + CREATE INDEX presence_stream_state ON presence_stream(state); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 5c522f4ab9..5ce54f76de 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -130,9 +130,11 @@ class StreamIdGenerator(object): return manager() - def get_max_token(self, store): + def get_max_token(self, *args): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. + + Used to take a DataStore param, which is no longer needed. """ with self._lock: if self._unfinished_ids: -- cgit 1.5.1 From 112283e23005bdaa17b6184cb55fd786facff47d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Feb 2016 10:11:43 +0000 Subject: Prefix TS fields with _ts --- synapse/handlers/presence.py | 54 +++++++++++----------- synapse/storage/__init__.py | 4 +- synapse/storage/presence.py | 23 ++++----- .../storage/schema/delta/30/presence_stream.sql | 6 +-- 4 files changed, 44 insertions(+), 43 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3137c23509..d296953651 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -127,24 +127,24 @@ class PresenceHandler(BaseHandler): self.wheel_timer.insert( now=now, obj=state.user_id, - then=state.last_active + IDLE_TIMER, + then=state.last_active_ts + IDLE_TIMER, ) self.wheel_timer.insert( now=now, obj=state.user_id, - then=state.last_user_sync + SYNC_ONLINE_TIMEOUT, + then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, ) if self.hs.is_mine_id(state.user_id): self.wheel_timer.insert( now=now, obj=state.user_id, - then=state.last_federation_update + FEDERATION_PING_INTERVAL, + then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL, ) else: self.wheel_timer.insert( now=now, obj=state.user_id, - then=state.last_federation_update + FEDERATION_TIMEOUT, + then=state.last_federation_update_ts + FEDERATION_TIMEOUT, ) # Set of users who have presence in the `user_to_current_state` that @@ -225,7 +225,7 @@ class PresenceHandler(BaseHandler): self.wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_active + IDLE_TIMER + then=new_state.last_active_ts + IDLE_TIMER ) if new_state.state != PresenceState.OFFLINE: @@ -233,14 +233,14 @@ class PresenceHandler(BaseHandler): self.wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_user_sync + SYNC_ONLINE_TIMEOUT + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT ) - last_federate = new_state.last_federation_update + last_federate = new_state.last_federation_update_ts if now - last_federate > FEDERATION_PING_INTERVAL: # Been a while since we've poked remote servers new_state = new_state.copy_and_replace( - last_federation_update=now, + last_federation_update_ts=now, ) to_federation_ping[user_id] = new_state @@ -248,11 +248,11 @@ class PresenceHandler(BaseHandler): self.wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_federation_update + FEDERATION_TIMEOUT + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT ) if new_state.state == PresenceState.ONLINE: - currently_active = now - new_state.last_active < LAST_ACTIVE_GRANULARITY + currently_active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY new_state = new_state.copy_and_replace( currently_active=currently_active, ) @@ -260,7 +260,7 @@ class PresenceHandler(BaseHandler): # Check whether the change was something worth notifying about if should_notify(prev_state, new_state): new_state.copy_and_replace( - last_federation_update=now, + last_federation_update_ts=now, ) to_notify[user_id] = new_state @@ -309,18 +309,18 @@ class PresenceHandler(BaseHandler): if self.hs.is_mine_id(user_id): if state.state == PresenceState.ONLINE: - if now - state.last_active > IDLE_TIMER: + if now - state.last_active_ts > IDLE_TIMER: # Currently online, but last activity ages ago so auto # idle changes[user_id] = state.copy_and_replace( state=PresenceState.UNAVAILABLE, ) - elif now - state.last_active > LAST_ACTIVE_GRANULARITY: + elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: # So that we send down a notification that we've # stopped updating. changes[user_id] = state - if now - state.last_federation_update > FEDERATION_PING_INTERVAL: + if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: # Need to send ping to other servers to ensure they don't # timeout and set us to offline changes[user_id] = state @@ -328,7 +328,7 @@ class PresenceHandler(BaseHandler): # If there are have been no sync for a while (and none ongoing), # set presence to offline if not self.user_to_num_current_syncs.get(user_id, 0): - if now - state.last_user_sync > SYNC_ONLINE_TIMEOUT: + if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: changes[user_id] = state.copy_and_replace( state=PresenceState.OFFLINE, status_msg=None, @@ -337,7 +337,7 @@ class PresenceHandler(BaseHandler): # We expect to be poked occaisonally by the other side. # This is to protect against forgetful/buggy servers, so that # no one gets stuck online forever. - if now - state.last_federation_update > FEDERATION_TIMEOUT: + if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: # The other side seems to have disappeared. changes[user_id] = state.copy_and_replace( state=PresenceState.OFFLINE, @@ -356,7 +356,7 @@ class PresenceHandler(BaseHandler): prev_state = yield self.current_state_for_user(user_id) new_fields = { - "last_active": self.clock.time_msec(), + "last_active_ts": self.clock.time_msec(), } if prev_state.state == PresenceState.UNAVAILABLE: new_fields["state"] = PresenceState.ONLINE @@ -388,12 +388,12 @@ class PresenceHandler(BaseHandler): # just update the last sync times. yield self._update_states([prev_state.copy_and_replace( state=PresenceState.ONLINE, - last_active=self.clock.time_msec(), - last_user_sync=self.clock.time_msec(), + last_active_ts=self.clock.time_msec(), + last_user_sync_ts=self.clock.time_msec(), )]) else: yield self._update_states([prev_state.copy_and_replace( - last_user_sync=self.clock.time_msec(), + last_user_sync_ts=self.clock.time_msec(), )]) @defer.inlineCallbacks @@ -403,7 +403,7 @@ class PresenceHandler(BaseHandler): prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( - last_user_sync=self.clock.time_msec(), + last_user_sync_ts=self.clock.time_msec(), )]) @contextmanager @@ -553,12 +553,12 @@ class PresenceHandler(BaseHandler): new_fields = { "state": presence_state, - "last_federation_update": now, + "last_federation_update_ts": now, } last_active_ago = push.get("last_active_ago", None) if last_active_ago is not None: - new_fields["last_active"] = now - last_active_ago + new_fields["last_active_ts"] = now - last_active_ago new_fields["status_msg"] = push.get("status_msg", None) @@ -632,7 +632,7 @@ class PresenceHandler(BaseHandler): } if presence == PresenceState.ONLINE: - new_fields["last_active"] = self.clock.time_msec() + new_fields["last_active_ts"] = self.clock.time_msec() yield self._update_states([prev_state.copy_and_replace(**new_fields)]) @@ -823,7 +823,7 @@ def should_notify(old_state, new_state): if new_state.currently_active != old_state.currently_active: return True - if new_state.last_active - old_state.last_active > LAST_ACTIVE_GRANULARITY: + if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. return True @@ -841,8 +841,8 @@ def _format_user_presence_state(state, now): "presence": state.state, "user_id": state.user_id, } - if state.last_active: - content["last_active_ago"] = now - state.last_active + if state.last_active_ts: + content["last_active_ago"] = now - state.last_active_ts if state.status_msg and state.state != PresenceState.OFFLINE: content["status_msg"] = state.status_msg if state.state == PresenceState.ONLINE: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8c3cf9e801..fcb968e8f4 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -203,8 +203,8 @@ class DataStore(RoomMemberStore, RoomStore, """ sql = ( - "SELECT user_id, state, last_active, last_federation_update," - " last_user_sync, status_msg, currently_active FROM presence_stream" + "SELECT user_id, state, last_active_ts, last_federation_update_ts," + " last_user_sync_ts, status_msg, currently_active FROM presence_stream" " WHERE state != ?" ) sql = self.database_engine.convert_param_style(sql) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index b133979102..70ece56548 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -22,8 +22,9 @@ from twisted.internet import defer class UserPresenceState(namedtuple("UserPresenceState", - ("user_id", "state", "last_active", "last_federation_update", - "last_user_sync", "status_msg", "currently_active"))): + ("user_id", "state", "last_active_ts", + "last_federation_update_ts", "last_user_sync_ts", + "status_msg", "currently_active"))): """Represents the current presence state of the user. user_id (str) @@ -46,9 +47,9 @@ class UserPresenceState(namedtuple("UserPresenceState", return cls( user_id=user_id, state=PresenceState.OFFLINE, - last_active=0, - last_federation_update=0, - last_user_sync=0, + last_active_ts=0, + last_federation_update_ts=0, + last_user_sync_ts=0, status_msg=None, currently_active=False, ) @@ -82,9 +83,9 @@ class PresenceStore(SQLBaseStore): "stream_id": stream_id, "user_id": state.user_id, "state": state.state, - "last_active": state.last_active, - "last_federation_update": state.last_federation_update, - "last_user_sync": state.last_user_sync, + "last_active_ts": state.last_active_ts, + "last_federation_update_ts": state.last_federation_update_ts, + "last_user_sync_ts": state.last_user_sync_ts, "status_msg": state.status_msg, "currently_active": state.currently_active, } @@ -121,9 +122,9 @@ class PresenceStore(SQLBaseStore): retcols=( "user_id", "state", - "last_active", - "last_federation_update", - "last_user_sync", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", "status_msg", "currently_active", ), diff --git a/synapse/storage/schema/delta/30/presence_stream.sql b/synapse/storage/schema/delta/30/presence_stream.sql index 14f5e3d30a..606bbb037d 100644 --- a/synapse/storage/schema/delta/30/presence_stream.sql +++ b/synapse/storage/schema/delta/30/presence_stream.sql @@ -18,9 +18,9 @@ stream_id BIGINT, user_id TEXT, state TEXT, - last_active BIGINT, - last_federation_update BIGINT, - last_user_sync BIGINT, + last_active_ts BIGINT, + last_federation_update_ts BIGINT, + last_user_sync_ts BIGINT, status_msg TEXT, currently_active BOOLEAN ); -- cgit 1.5.1 From 42109a62a43ca0b8c1e0d3f797bbc70e0018ca5c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Feb 2016 16:37:28 +0000 Subject: Remove unused param from get_max_token --- synapse/storage/account_data.py | 4 ++-- synapse/storage/events.py | 2 +- synapse/storage/receipts.py | 6 +++--- synapse/storage/stream.py | 2 +- synapse/storage/tags.py | 6 +++--- synapse/storage/util/id_generators.py | 4 +--- 6 files changed, 11 insertions(+), 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index b8387fc500..91cbf399b6 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -168,7 +168,7 @@ class AccountDataStore(SQLBaseStore): "add_room_account_data", add_account_data_txn, next_id ) - result = yield self._account_data_id_gen.get_max_token(self) + result = yield self._account_data_id_gen.get_max_token() defer.returnValue(result) @defer.inlineCallbacks @@ -207,7 +207,7 @@ class AccountDataStore(SQLBaseStore): "add_user_account_data", add_account_data_txn, next_id ) - result = yield self._account_data_id_gen.get_max_token(self) + result = yield self._account_data_id_gen.get_max_token() defer.returnValue(result) def _update_max_stream_id(self, txn, next_id): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3a5c6ee4b1..1dd3236829 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -131,7 +131,7 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_max_token(self) + max_persisted_id = yield self._stream_id_gen.get_max_token() defer.returnValue((stream_ordering, max_persisted_id)) @defer.inlineCallbacks diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 4202a6b3dc..a7343c97f7 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore): super(ReceiptsStore, self).__init__(hs) self._receipts_stream_cache = StreamChangeCache( - "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token() ) @cached(num_args=2) @@ -222,7 +222,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(results) def get_max_receipt_stream_id(self): - return self._receipts_id_gen.get_max_token(self) + return self._receipts_id_gen.get_max_token() def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): @@ -347,7 +347,7 @@ class ReceiptsStore(SQLBaseStore): room_id, receipt_type, user_id, event_ids, data ) - max_persisted_id = yield self._stream_id_gen.get_max_token(self) + max_persisted_id = yield self._stream_id_gen.get_max_token() defer.returnValue((stream_id, max_persisted_id)) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c236dafafb..8908d5b5da 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -531,7 +531,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): - token = yield self._stream_id_gen.get_max_token(self) + token = yield self._stream_id_gen.get_max_token() if direction != 'b': defer.returnValue("s%d" % (token,)) else: diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index e1a9c0c261..9551aa9739 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore): Returns: A deferred int. """ - return self._account_data_id_gen.get_max_token(self) + return self._account_data_id_gen.get_max_token() @cached() def get_tags_for_user(self, user_id): @@ -147,7 +147,7 @@ class TagsStore(SQLBaseStore): self.get_tags_for_user.invalidate((user_id,)) - result = yield self._account_data_id_gen.get_max_token(self) + result = yield self._account_data_id_gen.get_max_token() defer.returnValue(result) @defer.inlineCallbacks @@ -169,7 +169,7 @@ class TagsStore(SQLBaseStore): self.get_tags_for_user.invalidate((user_id,)) - result = yield self._account_data_id_gen.get_max_token(self) + result = yield self._account_data_id_gen.get_max_token() defer.returnValue(result) def _update_revision_txn(self, txn, user_id, room_id, next_id): diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 5ce54f76de..ef5e4a4668 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -130,11 +130,9 @@ class StreamIdGenerator(object): return manager() - def get_max_token(self, *args): + def get_max_token(self): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. - - Used to take a DataStore param, which is no longer needed. """ with self._lock: if self._unfinished_ids: -- cgit 1.5.1 From e6c5e3f28a234be342cf5e4f15a925e65e82fd0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Feb 2016 16:39:28 +0000 Subject: Close cursor --- synapse/storage/__init__.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index fcb968e8f4..9be1d12fac 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -212,6 +212,7 @@ class DataStore(RoomMemberStore, RoomStore, txn = db_conn.cursor() txn.execute(sql, (PresenceState.OFFLINE,)) rows = self.cursor_to_dict(txn) + txn.close() for row in rows: row["currently_active"] = bool(row["currently_active"]) -- cgit 1.5.1