diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e843b702b9..ba88a54979 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,51 +15,48 @@
# limitations under the License.
import datetime
-from dateutil import tz
-import time
import logging
+import time
+from dateutil import tz
+
+from synapse.api.constants import PresenceState
from synapse.storage.devices import DeviceStore
from synapse.storage.user_erasure_store import UserErasureStore
-from .appservice import (
- ApplicationServiceStore, ApplicationServiceTransactionStore
-)
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from .account_data import AccountDataStore
+from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
+from .client_ips import ClientIpStore
+from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
+from .end_to_end_keys import EndToEndKeyStore
+from .engines import PostgresEngine
+from .event_federation import EventFederationStore
+from .event_push_actions import EventPushActionsStore
from .events import EventsStore
+from .filtering import FilteringStore
+from .group_server import GroupServerStore
+from .keys import KeyStore
+from .media_repository import MediaRepositoryStore
+from .openid import OpenIdStore
from .presence import PresenceStore, UserPresenceState
from .profile import ProfileStore
+from .push_rule import PushRuleStore
+from .pusher import PusherStore
+from .receipts import ReceiptsStore
from .registration import RegistrationStore
+from .rejections import RejectionsStore
from .room import RoomStore
from .roommember import RoomMemberStore
-from .stream import StreamStore
-from .transactions import TransactionStore
-from .keys import KeyStore
-from .event_federation import EventFederationStore
-from .pusher import PusherStore
-from .push_rule import PushRuleStore
-from .media_repository import MediaRepositoryStore
-from .rejections import RejectionsStore
-from .event_push_actions import EventPushActionsStore
-from .deviceinbox import DeviceInboxStore
-from .group_server import GroupServerStore
-from .state import StateStore
-from .signatures import SignatureStore
-from .filtering import FilteringStore
-from .end_to_end_keys import EndToEndKeyStore
-
-from .receipts import ReceiptsStore
from .search import SearchStore
+from .signatures import SignatureStore
+from .state import StateStore
+from .stream import StreamStore
from .tags import TagsStore
-from .account_data import AccountDataStore
-from .openid import OpenIdStore
-from .client_ips import ClientIpStore
+from .transactions import TransactionStore
from .user_directory import UserDirectoryStore
-
-from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
-from .engines import PostgresEngine
-
-from synapse.api.constants import PresenceState
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerator
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 22d6257a9f..98dde77431 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -13,22 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import sys
+import threading
+import time
-from synapse.api.errors import StoreError
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.util.caches.descriptors import Cache
-from synapse.storage.engines import PostgresEngine
+from six import iteritems, iterkeys, itervalues
+from six.moves import intern, range
from prometheus_client import Histogram
from twisted.internet import defer
-import sys
-import time
-import threading
-
-from six import itervalues, iterkeys, iteritems
-from six.moves import intern, range
+from synapse.api.errors import StoreError
+from synapse.storage.engines import PostgresEngine
+from synapse.util.caches.descriptors import Cache
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
@@ -221,7 +220,7 @@ class SQLBaseStore(object):
self._clock.looping_call(loop, 10000)
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
- logging_context, func, *args, **kwargs):
+ func, *args, **kwargs):
start = time.time()
txn_id = self._TXN_ID
@@ -285,8 +284,7 @@ class SQLBaseStore(object):
end = time.time()
duration = end - start
- if logging_context is not None:
- logging_context.add_database_transaction(duration)
+ LoggingContext.current_context().add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
@@ -310,19 +308,15 @@ class SQLBaseStore(object):
Returns:
Deferred: The result of func
"""
- current_context = LoggingContext.current_context()
-
after_callbacks = []
exception_callbacks = []
- def inner_func(conn, *args, **kwargs):
- return self._new_transaction(
- conn, desc, after_callbacks, exception_callbacks, current_context,
- func, *args, **kwargs
- )
-
try:
- result = yield self.runWithConnection(inner_func, *args, **kwargs)
+ result = yield self.runWithConnection(
+ self._new_transaction,
+ desc, after_callbacks, exception_callbacks, func,
+ *args, **kwargs
+ )
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
@@ -347,22 +341,25 @@ class SQLBaseStore(object):
Returns:
Deferred: The result of func
"""
- current_context = LoggingContext.current_context()
+ parent_context = LoggingContext.current_context()
+ if parent_context == LoggingContext.sentinel:
+ logger.warn(
+ "Running db txn from sentinel context: metrics will be lost",
+ )
+ parent_context = None
start_time = time.time()
def inner_func(conn, *args, **kwargs):
- with LoggingContext("runWithConnection") as context:
+ with LoggingContext("runWithConnection", parent_context) as context:
sched_duration_sec = time.time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
- current_context.add_database_scheduled(sched_duration_sec)
+ context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
- current_context.copy_to(context)
-
return func(conn, *args, **kwargs)
with PreserveLoggingContext():
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7034a61399..bbc3355c73 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -14,18 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
+import logging
+
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.util.id_generators import StreamIdGenerator
-
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-
-from canonicaljson import json
-
-import abc
-import logging
+from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 4d32d0bdf6..9f12b360bc 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,14 +15,16 @@
# limitations under the License.
import logging
import re
-from twisted.internet import defer
+
from canonicaljson import json
+from twisted.internet import defer
+
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index af18964510..dc9eca7d15 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from . import engines
-
-from twisted.internet import defer
+import logging
from canonicaljson import json
-import logging
+from twisted.internet import defer
+
+from . import engines
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 968d2fed22..b78eda3413 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,15 +15,14 @@
import logging
-from twisted.internet import defer
+from six import iteritems
-from ._base import Cache
-from . import background_updates
+from twisted.internet import defer
from synapse.util.caches import CACHE_SIZE_FACTOR
-from six import iteritems
-
+from . import background_updates
+from ._base import Cache
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 38addbf9c0..73646da025 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -19,10 +19,9 @@ from canonicaljson import json
from twisted.internet import defer
-from .background_updates import BackgroundUpdateStore
-
from synapse.util.caches.expiringcache import ExpiringCache
+from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 2ed9ada783..ec68e39f1e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -14,15 +14,16 @@
# limitations under the License.
import logging
+from six import iteritems, itervalues
+
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import StoreError
-from ._base import SQLBaseStore, Cache
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
-
-from canonicaljson import json
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from six import itervalues, iteritems
+from ._base import Cache, SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index d0c0059757..808194236a 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached
-
-from synapse.api.errors import SynapseError
+from collections import namedtuple
from twisted.internet import defer
-from collections import namedtuple
+from synapse.api.errors import SynapseError
+from synapse.util.caches.descriptors import cached
+from ._base import SQLBaseStore
RoomAliasMapping = namedtuple(
"RoomAliasMapping",
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 181047c8b7..7ae5c65482 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import iteritems
+
+from canonicaljson import encode_canonical_json, json
+
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
-from canonicaljson import encode_canonical_json, json
-
from ._base import SQLBaseStore
-from six import iteritems
-
class EndToEndKeyStore(SQLBaseStore):
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 8c868ece75..e2f9de8451 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -13,13 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import IncorrectDatabaseSetup
-from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
-
import importlib
import platform
+from ._base import IncorrectDatabaseSetup
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 60f0fa7fb3..19949fc474 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import prepare_database
-
import struct
import threading
+from synapse.storage.prepare_database import prepare_database
+
class Sqlite3Engine(object):
single_threaded = True
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8fbf7ffba7..8d366d1b91 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,23 +12,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import random
+from six.moves import range
+from six.moves.queue import Empty, PriorityQueue
+
+from unpaddedbase64 import encode_base64
+
from twisted.internet import defer
+from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
-
-from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
-from unpaddedbase64 import encode_base64
-
-import logging
-from six.moves.queue import PriorityQueue, Empty
-
-from six.moves import range
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 05cb3f61ce..29b511ae5e 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,15 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage._base import SQLBaseStore, LoggingTransaction
-from twisted.internet import defer
-from synapse.util.caches.descriptors import cachedInlineCallbacks
-
import logging
+from six import iteritems
+
from canonicaljson import json
-from six import iteritems
+from twisted.internet import defer
+
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.util.caches.descriptors import cachedInlineCallbacks
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a54abb9edd..2aaab0d02c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,37 +14,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import OrderedDict, deque, namedtuple
-from functools import wraps
import itertools
import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+
+from six import iteritems, itervalues
+from six.moves import range
from canonicaljson import json
+from prometheus_client import Counter
from twisted.internet import defer
+import synapse.metrics
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
+from synapse.events.snapshot import EventContext # noqa: F401
from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable,
-)
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id, RoomStreamToken
-import synapse.metrics
-
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
-
-from six.moves import range
-from six import itervalues, iteritems
-
-from prometheus_client import Counter
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 896225aab9..67433606c6 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,29 +12,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+import logging
+from collections import namedtuple
+
+from canonicaljson import json
from twisted.internet import defer
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
-
from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable, run_in_background,
LoggingContext,
+ PreserveLoggingContext,
+ make_deferred_yieldable,
+ run_in_background,
)
from synapse.util.metrics import Measure
-from synapse.api.errors import SynapseError
-
-from collections import namedtuple
-
-import logging
-
-from canonicaljson import json
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -223,32 +222,47 @@ class EventsWorkerStore(SQLBaseStore):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.
"""
- event_list = []
i = 0
while True:
- try:
- with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
- single_threaded = self.database_engine.single_threaded
- if single_threaded or i > EVENT_QUEUE_ITERATIONS:
- self._event_fetch_ongoing -= 1
- return
- else:
- self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
- i += 1
- continue
- i = 0
+ with self._event_fetch_lock:
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+
+ if not event_list:
+ single_threaded = self.database_engine.single_threaded
+ if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+ self._event_fetch_ongoing -= 1
+ return
+ else:
+ self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+ i += 1
+ continue
+ i = 0
+
+ self._fetch_event_list(conn, event_list)
+
+ def _fetch_event_list(self, conn, event_list):
+ """Handle a load of requests from the _event_fetch_list queue
+ Args:
+ conn (twisted.enterprise.adbapi.Connection): database connection
+
+ event_list (list[Tuple[list[str], Deferred]]):
+ The fetch requests. Each entry consists of a list of event
+ ids to be fetched, and a deferred to be completed once the
+ events have been fetched.
+
+ """
+ with Measure(self._clock, "_fetch_event_list"):
+ try:
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
rows = self._new_transaction(
- conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
+ conn, "do_fetch", [], [],
+ self._fetch_event_rows, event_ids,
)
row_dict = {
@@ -281,9 +295,8 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
d.errback(e)
- if event_list:
- with PreserveLoggingContext():
- self.hs.get_reactor().callFromThread(fire, event_list)
+ with PreserveLoggingContext():
+ self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index eae6027cee..2d5896c5b4 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -13,13 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from canonicaljson import encode_canonical_json, json
+
from twisted.internet import defer
-from ._base import SQLBaseStore
-from synapse.api.errors import SynapseError, Codes
+from synapse.api.errors import Codes, SynapseError
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from canonicaljson import encode_canonical_json, json
+from ._base import SQLBaseStore
class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index b77402d295..592d1b4c2a 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -14,15 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import SynapseError
from ._base import SQLBaseStore
-from canonicaljson import json
-
-
# The category ID for the "default" category. We don't store as null in the
# database to avoid the fun of null != null
_DEFAULT_CATEGORY_ID = ""
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 0f13b61da8..f547977600 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -13,17 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+import hashlib
+import logging
-from twisted.internet import defer
import six
-import OpenSSL
from signedjson.key import decode_verify_key_bytes
-import hashlib
-import logging
+import OpenSSL
+from twisted.internet import defer
+
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index cf2aae0468..b290f834b3 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -20,7 +20,6 @@ import logging
import os
import re
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index f05d91cc58..a0c7a0dc87 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -13,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from collections import namedtuple
+
+from twisted.internet import defer
+
from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util import batch_iter
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from collections import namedtuple
-from twisted.internet import defer
+from ._base import SQLBaseStore
class UserPresenceState(namedtuple("UserPresenceState",
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 8612bd5ecc..60295da254 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -15,8 +15,8 @@
from twisted.internet import defer
-from synapse.storage.roommember import ProfileInfo
from synapse.api.errors import StoreError
+from synapse.storage.roommember import ProfileInfo
from ._base import SQLBaseStore
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9e52e992b3..be655d287b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,21 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+import abc
+import logging
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes
+from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
from synapse.storage.receipts import ReceiptsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.push.baserules import list_with_base_rules
-from synapse.api.constants import EventTypes
-from twisted.internet import defer
-from canonicaljson import json
-
-import abc
-import logging
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c6def861cf..cc273a57b2 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -14,15 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from twisted.internet import defer
+import logging
+import types
from canonicaljson import encode_canonical_json, json
+from twisted.internet import defer
+
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
-import logging
-import types
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index f230a3bab7..0ac665e967 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -14,18 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from .util.id_generators import StreamIdGenerator
-from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from twisted.internet import defer
+import abc
+import logging
from canonicaljson import json
-import abc
-import logging
+from twisted.internet import defer
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import SQLBaseStore
+from .util.id_generators import StreamIdGenerator
logger = logging.getLogger(__name__)
@@ -140,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""
room_ids = set(room_ids)
- if from_key:
+ if from_key is not None:
+ # Only ask the database about rooms where there have been new
+ # receipts added since `from_key`
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
@@ -151,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
- @cachedInlineCallbacks(num_args=3, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@@ -162,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
from the start.
Returns:
- list: A list of receipts.
+ Deferred[list]: A list of receipts.
+ """
+ if from_key is not None:
+ # Check the cache first to see if any new receipts have been added
+ # since`from_key`. If not we can no-op.
+ if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
+ defer.succeed([])
+
+ return self._get_linearized_receipts_for_room(room_id, to_key, from_key)
+
+ @cachedInlineCallbacks(num_args=3, tree=True)
+ def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
+ """See get_linearized_receipts_for_room
"""
def f(txn):
if from_key:
@@ -211,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"content": content,
}])
- @cachedList(cached_method_name="get_linearized_receipts_for_room",
+ @cachedList(cached_method_name="_get_linearized_receipts_for_room",
list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
@@ -373,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(
self._receipts_stream_cache.entity_has_changed,
@@ -493,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
self._simple_delete_txn(
txn,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 0d18f6d869..07333f777d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,15 +15,15 @@
import re
+from six.moves import range
+
from twisted.internet import defer
-from synapse.api.errors import StoreError, Codes
+from synapse.api.errors import Codes, StoreError
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from six.moves import range
-
class RegistrationWorkerStore(SQLBaseStore):
@cached()
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
index 40acb5c4ed..880f047adb 100644
--- a/synapse/storage/rejections.py
+++ b/synapse/storage/rejections.py
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-
import logging
+from ._base import SQLBaseStore
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index ca0eb187e5..3147fb6827 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -13,6 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
+import logging
+import re
+
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import StoreError
@@ -20,12 +26,6 @@ from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from canonicaljson import json
-
-import collections
-import logging
-import re
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8fc9549a75..02a802bed9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -14,24 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
+import logging
from collections import namedtuple
+from six import iteritems, itervalues
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
from synapse.storage.events import EventsWorkerStore
+from synapse.types import get_domain_from_id
from synapse.util.async import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.stringutils import to_ascii
-from synapse.api.constants import Membership, EventTypes
-from synapse.types import get_domain_from_id
-
-import logging
-from canonicaljson import json
-
-from six import itervalues, iteritems
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index e7351c3ae6..4b2ffd35fd 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -14,11 +14,11 @@
import logging
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-
import simplejson
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index 6df57b5206..414f9f5aa0 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -14,10 +14,10 @@
import logging
-from synapse.storage.prepare_database import get_statements
-
import simplejson
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 85bd1a2006..ef7ec34346 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from synapse.config.appservice import load_appservices
from six.moves import range
+from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py
index fe6b7d196d..7d8ca5f93f 100644
--- a/synapse/storage/schema/delta/31/search_update.py
+++ b/synapse/storage/schema/delta/31/search_update.py
@@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.prepare_database import get_statements
-
import logging
+
import simplejson
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py
index 1e002f9db2..bff1256a7b 100644
--- a/synapse/storage/schema/delta/33/event_fields.py
+++ b/synapse/storage/schema/delta/33/event_fields.py
@@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import get_statements
-
import logging
+
import simplejson
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 55ae43f395..9754d3ccfb 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -14,7 +14,6 @@
import time
-
ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
index 3b63a1562d..cf09e43e2b 100644
--- a/synapse/storage/schema/delta/34/cache_stream.py
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py
index 033144341c..67d505e68b 100644
--- a/synapse/storage/schema/delta/34/received_txn_purge.py
+++ b/synapse/storage/schema/delta/34/received_txn_purge.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/34/sent_txn_purge.py b/synapse/storage/schema/delta/34/sent_txn_purge.py
index 81948e3431..0ffab10b6f 100644
--- a/synapse/storage/schema/delta/34/sent_txn_purge.py
+++ b/synapse/storage/schema/delta/34/sent_txn_purge.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py
index 20ad8bd5a6..a377884169 100644
--- a/synapse/storage/schema/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/delta/37/remove_auth_idx.py
@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
DROP_INDICES = """
diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py
index ea6a18196d..506f326f4d 100644
--- a/synapse/storage/schema/delta/42/user_dir.py
+++ b/synapse/storage/schema/delta/42/user_dir.py
@@ -14,8 +14,8 @@
import logging
-from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 9b77c45318..d5b5df93e6 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -13,19 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
import logging
import re
-from canonicaljson import json
+from collections import namedtuple
from six import string_types
+from canonicaljson import json
+
from twisted.internet import defer
-from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from .background_updates import BackgroundUpdateStore
+
logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 25922e5a9c..470212aa2a 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
import six
-from ._base import SQLBaseStore
-
from unpaddedbase64 import encode_base64
+
+from twisted.internet import defer
+
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
+from ._base import SQLBaseStore
+
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index cd9821c270..89a05c4618 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
import logging
+from collections import namedtuple
from six import iteritems, itervalues
from six.moves import range
@@ -23,10 +23,11 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, get_cache_factor_for
+from synapse.util.caches import get_cache_factor_for, intern_string
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
+
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..66856342f0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -33,22 +33,20 @@ what sort order was used:
and stream ordering columns respectively.
"""
+import abc
+import logging
+from collections import namedtuple
+
+from six.moves import range
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
from synapse.storage.events import EventsWorkerStore
-
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.storage.engines import PostgresEngine
-
-import abc
-import logging
-
-from six.moves import range
-from collections import namedtuple
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 04d123ed95..0f657b2bd3 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -14,16 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.account_data import AccountDataWorkerStore
+import logging
-from synapse.util.caches.descriptors import cached
-from twisted.internet import defer
+from six.moves import range
from canonicaljson import json
-import logging
+from twisted.internet import defer
-from six.moves import range
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index acbc03446e..c3bc94f56d 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,17 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached
+import logging
+from collections import namedtuple
-from twisted.internet import defer
import six
from canonicaljson import encode_canonical_json, json
-from collections import namedtuple
+from twisted.internet import defer
-import logging
+from synapse.util.caches.descriptors import cached
+
+from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 275c299998..a8781b0e5d 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -13,19 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
+import re
-from ._base import SQLBaseStore
+from six import iteritems
+
+from twisted.internet import defer
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from six import iteritems
-
-import re
-import logging
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -265,7 +265,7 @@ class UserDirectoryStore(SQLBaseStore):
self.get_user_in_public_room.invalidate((user_id,))
def get_users_in_public_due_to_room(self, room_id):
- """Get all user_ids that are in the room directory becuase they're
+ """Get all user_ids that are in the room directory because they're
in the given room_id
"""
return self._simple_select_onecol(
@@ -277,7 +277,7 @@ class UserDirectoryStore(SQLBaseStore):
@defer.inlineCallbacks
def get_users_in_dir_due_to_room(self, room_id):
- """Get all user_ids that are in the room directory becuase they're
+ """Get all user_ids that are in the room directory because they're
in the given room_id
"""
user_ids_dir = yield self._simple_select_onecol(
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
index 47bfc01e84..be013f4427 100644
--- a/synapse/storage/user_erasure_store.py
+++ b/synapse/storage/user_erasure_store.py
@@ -17,7 +17,7 @@ import operator
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedList, cached
+from synapse.util.caches.descriptors import cached, cachedList
class UserErasureWorkerStore(SQLBaseStore):
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 95031dc9ec..d6160d5e4d 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import deque
import contextlib
import threading
+from collections import deque
class IdGenerator(object):
|