diff --git a/res/templates/notif.html b/res/templates/notif.html
index 834840861e..88b921ca9c 100644
--- a/res/templates/notif.html
+++ b/res/templates/notif.html
@@ -17,11 +17,15 @@
</td>
<td class="message_contents">
{% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
- <div class="sender_name">{{ message.sender_name }}</div>
+ <div class="sender_name">{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}</div>
{% endif %}
<div class="message_body">
{% if message.msgtype == "m.text" %}
{{ message.body_text_html }}
+ {% elif message.msgtype == "m.emote" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.notice" %}
+ {{ message.body_text_html }}
{% elif message.msgtype == "m.image" %}
<img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" />
{% elif message.msgtype == "m.file" %}
diff --git a/res/templates/notif.txt b/res/templates/notif.txt
index a3ddac80ce..a37bee9833 100644
--- a/res/templates/notif.txt
+++ b/res/templates/notif.txt
@@ -1,7 +1,11 @@
{% for message in notif.messages %}
-{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
+{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
{% if message.msgtype == "m.text" %}
{{ message.body_text_plain }}
+{% elif message.msgtype == "m.emote" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.notice" %}
+{{ message.body_text_plain }}
{% elif message.msgtype == "m.image" %}
{{ message.body_text_plain }}
{% elif message.msgtype == "m.file" %}
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 47a4e9f864..9afc8fd754 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -56,22 +56,22 @@ import logging
logger = logging.getLogger(__name__)
-class AppServiceScheduler(object):
+class ApplicationServiceScheduler(object):
""" Public facing API for this module. Does the required DI to tie the
components together. This also serves as the "event_pool", which in this
case is a simple array.
"""
- def __init__(self, clock, store, as_api):
- self.clock = clock
- self.store = store
- self.as_api = as_api
+ def __init__(self, hs):
+ self.clock = hs.get_clock()
+ self.store = hs.get_datastore()
+ self.as_api = hs.get_application_service_api()
def create_recoverer(service, callback):
- return _Recoverer(clock, store, as_api, service, callback)
+ return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController(
- clock, store, as_api, create_recoverer
+ self.clock, self.store, self.as_api, create_recoverer
)
self.queuer = _ServiceQueuer(self.txn_ctrl)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0b5f462e44..c2d8f8a52f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -29,6 +29,7 @@ class ServerConfig(Config):
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
self.public_baseurl = config.get("public_baseurl")
+ self.secondary_directory_servers = config.get("secondary_directory_servers", [])
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
@@ -156,6 +157,15 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
+ # A list of other Home Servers to fetch the public room directory from
+ # and include in the public room directory of this home server
+ # This is a temporary stopgap solution to populate new server with a
+ # list of rooms until there exists a good solution of a decentralized
+ # room directory.
+ # secondary_directory_servers:
+ # - matrix.org
+ # - vector.im
+
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 37ee469fa2..d835c1b038 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,6 +24,7 @@ from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
@@ -551,6 +552,25 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
+ def get_public_rooms(self, destinations):
+ results_by_server = {}
+
+ @defer.inlineCallbacks
+ def _get_result(s):
+ if s == self.server_name:
+ defer.returnValue()
+
+ try:
+ result = yield self.transport_layer.get_public_rooms(s)
+ results_by_server[s] = result
+ except:
+ logger.exception("Error getting room list from server %r", s)
+
+ yield concurrently_execute(_get_result, destinations, 3)
+
+ defer.returnValue(results_by_server)
+
+ @defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
"""
Params:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index cd2841c4db..ebb698e278 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -226,6 +226,18 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def get_public_rooms(self, remote_server):
+ path = PREFIX + "/publicRooms"
+
+ response = yield self.client.get_json(
+ destination=remote_server,
+ path=path,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def exchange_third_party_invite(self, destination, room_id, event_dict):
path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 5b6c7d11dd..a1a334955f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -134,10 +134,12 @@ class Authenticator(object):
class BaseFederationServlet(object):
- def __init__(self, handler, authenticator, ratelimiter, server_name):
+ def __init__(self, handler, authenticator, ratelimiter, server_name,
+ room_list_handler):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
+ self.room_list_handler = room_list_handler
def _wrap(self, code):
authenticator = self.authenticator
@@ -492,6 +494,50 @@ class OpenIdUserInfo(BaseFederationServlet):
return code
+class PublicRoomList(BaseFederationServlet):
+ """
+ Fetch the public room list for this server.
+
+ This API returns information in the same format as /publicRooms on the
+ client API, but will only ever include local public rooms and hence is
+ intended for consumption by other home servers.
+
+ GET /publicRooms HTTP/1.1
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "chunk": [
+ {
+ "aliases": [
+ "#test:localhost"
+ ],
+ "guest_can_join": false,
+ "name": "test room",
+ "num_joined_members": 3,
+ "room_id": "!whkydVegtvatLfXmPN:localhost",
+ "world_readable": false
+ }
+ ],
+ "end": "END",
+ "start": "START"
+ }
+ """
+
+ PATH = "/publicRooms"
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ data = yield self.room_list_handler.get_local_public_room_list()
+ defer.returnValue((200, data))
+
+ # Avoid doing remote HS authorization checks which are done by default by
+ # BaseFederationServlet.
+ def _wrap(self, code):
+ return code
+
+
SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -513,6 +559,7 @@ SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
OpenIdUserInfo,
+ PublicRoomList,
)
@@ -523,4 +570,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
+ room_list_handler=hs.get_room_list_handler(),
).register(resource)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 9442ae6f1d..c0069e23d6 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,11 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler
from .room import (
- RoomCreationHandler, RoomListHandler, RoomContextHandler,
+ RoomCreationHandler, RoomContextHandler,
)
from .room_member import RoomMemberHandler
from .message import MessageHandler
@@ -26,7 +24,6 @@ from .federation import FederationHandler
from .profile import ProfileHandler
from .directory import DirectoryHandler
from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
from .auth import AuthHandler
from .identity import IdentityHandler
from .receipts import ReceiptsHandler
@@ -50,18 +47,9 @@ class Handlers(object):
self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
- self.room_list_handler = RoomListHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.receipts_handler = ReceiptsHandler(hs)
- asapi = ApplicationServiceApi(hs)
- self.appservice_handler = ApplicationServicesHandler(
- hs, asapi, AppServiceScheduler(
- clock=hs.get_clock(),
- store=hs.get_datastore(),
- as_api=asapi
- )
- )
self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
-from synapse.types import UserID
import logging
@@ -35,16 +34,13 @@ def log_failure(failure):
)
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
class ApplicationServicesHandler(object):
- def __init__(self, hs, appservice_api, appservice_scheduler):
+ def __init__(self, hs):
self.store = hs.get_datastore()
- self.hs = hs
- self.appservice_api = appservice_api
- self.scheduler = appservice_scheduler
+ self.is_mine_id = hs.is_mine_id
+ self.appservice_api = hs.get_application_service_api()
+ self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
@defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
@defer.inlineCallbacks
def _is_unknown_user(self, user_id):
- user = UserID.from_string(user_id)
- if not self.hs.is_mine(user):
+ if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 68d0d78fc6..26c865e171 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError
@@ -563,7 +563,12 @@ class AuthHandler(BaseHandler):
except_access_token_ids = [requester.access_token_id] if requester else []
- yield self.store.user_set_password_hash(user_id, password_hash)
+ try:
+ yield self.store.user_set_password_hash(user_id, password_hash)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+ raise e
yield self.store.user_delete_access_tokens(
user_id, except_access_token_ids
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
super(DirectoryHandler, self).__init__(hs)
self.state = hs.get_state_handler()
+ self.appservice_handler = hs.get_application_service_handler()
self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
)
if not result:
# Query AS to see if it exists
- as_handler = self.hs.get_handlers().appservice_handler
+ as_handler = self.appservice_handler
result = yield as_handler.query_room_alias_exists(room_alias)
defer.returnValue(result)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3d63b3c513..9fd34588dd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -36,6 +36,8 @@ import string
logger = logging.getLogger(__name__)
+REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
+
id_server_scheme = "https://"
@@ -344,8 +346,14 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache()
+ self.remote_list_request_cache = ResponseCache()
+ self.remote_list_cache = {}
+ self.fetch_looping_call = hs.get_clock().looping_call(
+ self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
+ )
+ self.fetch_all_remote_lists()
- def get_public_room_list(self):
+ def get_local_public_room_list(self):
result = self.response_cache.get(())
if not result:
result = self.response_cache.set((), self._get_public_room_list())
@@ -427,6 +435,55 @@ class RoomListHandler(BaseHandler):
# FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": results})
+ @defer.inlineCallbacks
+ def fetch_all_remote_lists(self):
+ deferred = self.hs.get_replication_layer().get_public_rooms(
+ self.hs.config.secondary_directory_servers
+ )
+ self.remote_list_request_cache.set((), deferred)
+ self.remote_list_cache = yield deferred
+
+ @defer.inlineCallbacks
+ def get_aggregated_public_room_list(self):
+ """
+ Get the public room list from this server and the servers
+ specified in the secondary_directory_servers config option.
+ XXX: Pagination...
+ """
+ # We return the results from out cache which is updated by a looping call,
+ # unless we're missing a cache entry, in which case wait for the result
+ # of the fetch if there's one in progress. If not, omit that server.
+ wait = False
+ for s in self.hs.config.secondary_directory_servers:
+ if s not in self.remote_list_cache:
+ logger.warn("No cached room list from %s: waiting for fetch", s)
+ wait = True
+ break
+
+ if wait and self.remote_list_request_cache.get(()):
+ yield self.remote_list_request_cache.get(())
+
+ public_rooms = yield self.get_local_public_room_list()
+
+ # keep track of which room IDs we've seen so we can de-dup
+ room_ids = set()
+
+ # tag all the ones in our list with our server name.
+ # Also add the them to the de-deping set
+ for room in public_rooms['chunk']:
+ room["server_name"] = self.hs.hostname
+ room_ids.add(room["room_id"])
+
+ # Now add the results from federation
+ for server_name, server_result in self.remote_list_cache.items():
+ for room in server_result["chunk"]:
+ if room["room_id"] not in room_ids:
+ room["server_name"] = server_name
+ public_rooms["chunk"].append(room)
+ room_ids.add(room["room_id"])
+
+ defer.returnValue(public_rooms)
+
class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 33b79c0ec7..cbec4d30ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -140,8 +140,6 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs):
- self.hs = hs
-
self.user_to_user_stream = {}
self.room_to_user_streams = {}
self.appservice_to_user_streams = {}
@@ -151,6 +149,8 @@ class Notifier(object):
self.pending_new_room_events = []
self.clock = hs.get_clock()
+ self.appservice_handler = hs.get_application_service_handler()
+ self.state_handler = hs.get_state_handler()
hs.get_distributor().observe(
"user_joined_room", self._user_joined_room
@@ -232,9 +232,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- self.hs.get_handlers().appservice_handler.notify_interested_services(
- event
- )
+ self.appservice_handler.notify_interested_services(event)
app_streams = set()
@@ -449,7 +447,7 @@ class Notifier(object):
@defer.inlineCallbacks
def _is_world_readable(self, room_id):
- state = yield self.hs.get_state_handler().get_current_state(
+ state = yield self.state_handler.get_current_state(
room_id,
EventTypes.RoomHistoryVisibility
)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 644aa4e513..db52a1fc39 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -279,8 +279,9 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
- handler = self.handlers.room_list_handler
- data = yield handler.get_public_room_list()
+ handler = self.hs.get_room_list_handler()
+ data = yield handler.get_aggregated_public_room_list()
+
defer.returnValue((200, data))
diff --git a/synapse/server.py b/synapse/server.py
index 01f828819f..7cf22b1eea 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -22,6 +22,8 @@
from twisted.web.client import BrowserLikePolicyForHTTPS
from twisted.enterprise import adbapi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.appservice.api import ApplicationServiceApi
from synapse.federation import initialize_http_replication
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.notifier import Notifier
@@ -30,6 +32,8 @@ from synapse.handlers import Handlers
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
+from synapse.handlers.room import RoomListHandler
+from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.util import Clock
@@ -84,6 +88,10 @@ class HomeServer(object):
'presence_handler',
'sync_handler',
'typing_handler',
+ 'room_list_handler',
+ 'application_service_api',
+ 'application_service_scheduler',
+ 'application_service_handler',
'notifier',
'distributor',
'client_resource',
@@ -179,6 +187,18 @@ class HomeServer(object):
def build_sync_handler(self):
return SyncHandler(self)
+ def build_room_list_handler(self):
+ return RoomListHandler(self)
+
+ def build_application_service_api(self):
+ return ApplicationServiceApi(self)
+
+ def build_application_service_scheduler(self):
+ return ApplicationServiceScheduler(self)
+
+ def build_application_service_handler(self):
+ return ApplicationServicesHandler(self)
+
def build_event_sources(self):
return EventSources(self)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d970fde9e8..8581796b7e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import Cache
+from ._base import Cache, LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
def __init__(self, db_conn, hs):
self.hs = hs
+ self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache(
@@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill,
)
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[]
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 60 * 60 * 1000
+ )
+
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e0d7098692..56a0dd80f3 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -153,7 +153,6 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
- self._clock = hs.get_clock()
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
+ def __init__(self, hs):
+ self.stream_ordering_month_ago = None
+ super(EventPushActionsStore, self).__init__(hs)
+
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering,
- max_stream_ordering=None):
+ max_stream_ordering=None,
+ limit=20):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
@@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC"
+ sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+ args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
after_read_receipt = yield self.runInteraction(
@@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
- def _remove_push_actions_before_txn(self, txn, room_id, user_id,
- topological_ordering):
+ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ """
+ Purges old, stale push actions for a user and room before a given
+ topological_ordering
+ Args:
+ txn: The transcation
+ room_id: Room ID to delete from
+ user_id: user ID to delete for
+ topological_ordering: The lowest topological ordering which will
+ not be deleted.
+ """
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, )
)
+
+ # We need to join on the events table to get the received_ts for
+ # event_push_actions and sqlite won't let us use a join in a delete so
+ # we can't just delete where received_ts < x. Furthermore we can
+ # only identify event_push_actions by a tuple of room_id, event_id
+ # we we can't use a subquery.
+ # Instead, we look up the stream ordering for the last event in that
+ # room received before the threshold time and delete event_push_actions
+ # in the room with a stream_odering before that.
txn.execute(
- "DELETE FROM event_push_actions"
- " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
- (room_id, user_id, topological_ordering,)
+ "DELETE FROM event_push_actions "
+ " WHERE user_id = ? AND room_id = ? AND "
+ " topological_ordering < ? AND stream_ordering < ?",
+ (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+ )
+
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
)
+ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+ """
+ Find the stream_ordering of the first event that was received after
+ a given timestamp. This is relatively slow as there is no index on
+ received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ range_start = 0
+ range_end = max_stream_ordering
+
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering > ?"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 1:
+ middle = int((range_end + range_start) / 2)
+ txn.execute(sql, (middle,))
+ middle_ts = txn.fetchone()[0]
+ if ts > middle_ts:
+ range_start = middle
+ else:
+ range_end = middle
+
+ return range_end
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index f285f59afd..216a8bf69c 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -66,7 +66,10 @@ class PushRuleStore(SQLBaseStore):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: []
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules",
@@ -90,7 +93,10 @@ class PushRuleStore(SQLBaseStore):
if not user_ids:
defer.returnValue({})
- results = {}
+ results = {
+ user_id: {}
+ for user_id in user_ids
+ }
rows = yield self._simple_select_many_batch(
table="push_rules_enable",
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..f1774f0e44 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
)
if receipt_type == "m.read" and topological_ordering:
- self._remove_push_actions_before_txn(
+ self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
user_id=user_id,
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from synapse.storage.appservice import ApplicationServiceStore
+from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config")
pass
- appservices = ApplicationServiceStore.load_appservices(
+ appservices = load_appservices(
config.server_name, config_files
)
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 7ddbbb9b4a..a884c95f8d 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -30,9 +30,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store)
- self.handler = ApplicationServicesHandler(
- hs, self.mock_as_api, self.mock_scheduler
- )
+ hs.get_application_service_api = Mock(return_value=self.mock_as_api)
+ hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
+ self.handler = ApplicationServicesHandler(hs)
@defer.inlineCallbacks
def test_notify_interested_services(self):
diff --git a/tests/utils.py b/tests/utils.py
index 59d985b5f2..006abedbc1 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
get_db_conn=db_pool.get_db_conn,
+ room_list_handler=object(),
**kargs
)
hs.setup()
@@ -75,6 +76,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
+ room_list_handler=object(),
**kargs
)
|