diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 330 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 6 | ||||
-rw-r--r-- | synapse/replication/resource.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/deviceinbox.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 11 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/notifications.py | 3 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/deviceinbox.py | 17 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 11 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/35/device_outbox.sql (renamed from synapse/storage/schema/delta/34/device_outbox.sql) | 5 | ||||
-rw-r--r-- | synapse/storage/schema/delta/35/device_stream_id.sql | 21 |
13 files changed, 245 insertions, 171 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 523deaa5ff..b778cd65c9 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.17.2" +__version__ = "0.17.3" diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 633c79c352..1ac569b305 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -170,43 +170,68 @@ class TransactionQueue(object): @defer.inlineCallbacks def _attempt_new_transaction(self, destination): - yield run_on_reactor() - while True: - # list of (pending_pdu, deferred, order) - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return + # list of (pending_pdu, deferred, order) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_failures = self.pending_failures_by_dest.pop(destination, []) + try: + self.pending_transactions[destination] = 1 - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) + yield run_on_reactor() - pending_edus.extend(device_message_edus) + while True: + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest.pop(destination, []) - if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) - if not pending_pdus and not pending_edus and not pending_failures: - logger.debug("TX [%s] Nothing to send", destination) - return + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) - yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, - device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) + pending_edus.extend(device_message_edus) + + if pending_pdus: + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) + + if not pending_pdus and not pending_edus and not pending_failures: + logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = ( + device_stream_id + ) + return + + success = yield self._send_new_transaction( + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus), + limiter=limiter, + ) + if not success: + break + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, ) + finally: + # We want to be *very* sure we delete this after we stop processing + self.pending_transactions.pop(destination, None) @defer.inlineCallbacks def _get_new_device_messages(self, destination): @@ -230,151 +255,142 @@ class TransactionQueue(object): @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream): + should_delete_from_device_stream, limiter): - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - failures = [x.get_dict() for x in pending_failures] + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[1]) + pdus = [x[0] for x in pending_pdus] + edus = pending_edus + failures = [x.get_dict() for x in pending_failures] - try: - self.pending_transactions[destination] = 1 + success = True - logger.debug("TX [%s] _attempt_new_transaction", destination) + try: + logger.debug("TX [%s] _attempt_new_transaction", destination) - txn_id = str(self._next_txn_id) + txn_id = str(self._next_txn_id) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) + logger.debug( + "TX [%s] {%s} Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, txn_id, + len(pdus), + len(edus), + len(failures) + ) - logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, txn_id, - len(pdus), - len(edus), - len(failures) - ) + logger.debug("TX [%s] Persisting transaction...", destination) + + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - logger.debug("TX [%s] Persisting transaction...", destination) + self._next_txn_id += 1 - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + yield self.transaction_actions.prepare_to_send(transaction) - self._next_txn_id += 1 + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", + destination, txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + len(failures), + ) - yield self.transaction_actions.prepare_to_send(transaction) + with limiter: + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + + if response: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: + code = e.code + response = e.response - logger.debug("TX [%s] Persisted transaction", destination) logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", - destination, txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - len(failures), + "TX [%s] {%s} got %d response", + destination, txn_id, code ) - with limiter: - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) - except HttpResponseException as e: - code = e.code - response = e.response + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code - ) - - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) + yield self.transaction_actions.delivered( + transaction, code, response + ) - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.debug("TX [%s] Marked as delivered", destination) - logger.debug("TX [%s] Marked as delivered", destination) + if code != 200: + for p in pdus: + logger.info( + "Failed to send event %s to %s", p.event_id, destination + ) + success = False + else: + # Remove the acknowledged device messages from the database + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) + self.last_device_stream_id_by_dest[destination] = device_stream_id + except RuntimeError as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) - if code != 200: - for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination - ) - else: - # Remove the acknowledged device messages from the database - if should_delete_from_device_stream: - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) - self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - except RuntimeError as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + success = False + + for p in pdus: + logger.info("Failed to send event %s to %s", p.event_id, destination) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + success = False - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) + for p in pdus: + logger.info("Failed to send event %s to %s", p.event_id, destination) - finally: - # We want to be *very* sure we delete this after we stop processing - self.pending_transactions.pop(destination, None) + defer.returnValue(success) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7a3c16a8aa..16dbddee03 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -265,6 +265,12 @@ class PresenceHandler(object): to_notify = {} # Changes we want to notify everyone about to_federation_ping = {} # These need sending keep-alives + # Only bother handling the last presence change for each user + new_states_dict = {} + for new_state in new_states: + new_states_dict[new_state.user_id] = new_state + new_state = new_states_dict.values() + for new_state in new_states: user_id = new_state.user_id diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 1ed9034bcb..857bc9795c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -181,7 +181,7 @@ class ReplicationResource(Resource): def replicate(self, request_streams, limit): writer = _Writer() current_token = yield self.current_replication_token() - logger.info("Replicating up to %r", current_token) + logger.debug("Replicating up to %r", current_token) yield self.account_data(writer, current_token, limit, request_streams) yield self.events(writer, current_token, limit, request_streams) @@ -195,7 +195,7 @@ class ReplicationResource(Resource): yield self.to_device(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) - logger.info("Replicated %d rows", writer.total) + logger.debug("Replicated %d rows", writer.total) defer.returnValue(writer.finish()) def streams(self, writer, current_token, request_streams): diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 251078ba57..3bfd5e8213 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -23,7 +23,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id", + db_conn, "device_max_stream_id", "stream_id", ) self._device_inbox_stream_cache = StreamChangeCache( "DeviceInboxStreamChangeCache", diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3c933f1620..22d6a7d31e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -22,7 +22,7 @@ from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias -from synapse.events.utils import serialize_event +from synapse.events.utils import serialize_event, format_event_for_client_v2 from synapse.http.servlet import parse_json_object_from_request, parse_string import logging @@ -120,6 +120,8 @@ class RoomStateEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, room_id, event_type, state_key): requester = yield self.auth.get_user_by_req(request, allow_guest=True) + format = parse_string(request, "format", default="content", + allowed_values=["content", "event"]) msg_handler = self.handlers.message_handler data = yield msg_handler.get_room_data( @@ -134,7 +136,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet): raise SynapseError( 404, "Event not found.", errcode=Codes.NOT_FOUND ) - defer.returnValue((200, data.get_dict()["content"])) + + if format == "event": + event = format_event_for_client_v2(data.get_dict()) + defer.returnValue((200, event)) + elif format == "content": + defer.returnValue((200, data.get_dict()["content"])) @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py index f1a48acf07..fd2a3d69d4 100644 --- a/synapse/rest/client/v2_alpha/notifications.py +++ b/synapse/rest/client/v2_alpha/notifications.py @@ -45,11 +45,12 @@ class NotificationsServlet(RestServlet): from_token = parse_string(request, "from", required=False) limit = parse_integer(request, "limit", default=50) + only = parse_string(request, "only", required=False) limit = min(limit, 500) push_actions = yield self.store.get_push_actions_for_user( - user_id, from_token, limit + user_id, from_token, limit, only_highlight=(only == "highlight") ) receipts_by_room = yield self.store.get_receipts_for_user_with_orderings( diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 828e5ca60b..a61e83d5de 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -111,7 +111,7 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "presence_stream", "stream_id" ) self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" + db_conn, "device_max_stream_id", "stream_id" ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 658fbef27b..b729b7106e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -130,6 +130,13 @@ class DeviceInboxStore(SQLBaseStore): def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): + sql = ( + "UPDATE device_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(sql, (stream_id, stream_id)) + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} @@ -148,6 +155,8 @@ class DeviceInboxStore(SQLBaseStore): device = row[0] messages_json_for_user[device] = message_json else: + if not devices: + continue sql = ( "SELECT device_id FROM devices" " WHERE user_id = ? AND device_id IN (" @@ -164,7 +173,11 @@ class DeviceInboxStore(SQLBaseStore): message_json = ujson.dumps(messages_by_device[device]) messages_json_for_user[device] = message_json - local_by_user_then_device[user_id] = messages_json_for_user + if messages_json_for_user: + local_by_user_then_device[user_id] = messages_json_for_user + + if not local_by_user_then_device: + return sql = ( "INSERT INTO device_inbox" @@ -301,7 +314,7 @@ class DeviceInboxStore(SQLBaseStore): has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) - if not has_changed: + if not has_changed or last_stream_id == current_stream_id: return defer.succeed(([], current_stream_id)) def get_new_messages_for_remote_destination_txn(txn): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index eb15fb751b..10e9305f7b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -338,14 +338,21 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(notifs[:limit]) @defer.inlineCallbacks - def get_push_actions_for_user(self, user_id, before=None, limit=50): + def get_push_actions_for_user(self, user_id, before=None, limit=50, + only_highlight=False): def f(txn): before_clause = "" if before: - before_clause = "AND stream_ordering < ?" + before_clause = "AND epa.stream_ordering < ?" args = [user_id, before, limit] else: args = [user_id, limit] + + if only_highlight: + if len(before_clause) > 0: + before_clause += " " + before_clause += "AND epa.highlight = 1" + sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 6ab10db328..866d64e679 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -402,7 +402,7 @@ class RoomMemberStore(SQLBaseStore): keyvalues={ "membership": Membership.JOIN, }, - batch_size=1000, + batch_size=500, desc="_get_joined_users_from_context", ) diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/35/device_outbox.sql index e87066d9a1..17e6c43105 100644 --- a/synapse/storage/schema/delta/34/device_outbox.sql +++ b/synapse/storage/schema/delta/35/device_outbox.sql @@ -13,6 +13,7 @@ * limitations under the License. */ +DROP TABLE IF EXISTS device_federation_outbox; CREATE TABLE device_federation_outbox ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, @@ -21,16 +22,18 @@ CREATE TABLE device_federation_outbox ( ); +DROP INDEX IF EXISTS device_federation_outbox_destination_id; CREATE INDEX device_federation_outbox_destination_id ON device_federation_outbox(destination, stream_id); +DROP TABLE IF EXISTS device_federation_inbox; CREATE TABLE device_federation_inbox ( origin TEXT NOT NULL, message_id TEXT NOT NULL, received_ts BIGINT NOT NULL ); - +DROP INDEX IF EXISTS device_federation_inbox_sender_id; CREATE INDEX device_federation_inbox_sender_id ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql new file mode 100644 index 0000000000..7ab7d942e2 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_stream_id.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_max_stream_id ( + stream_id BIGINT NOT NULL +); + +INSERT INTO device_max_stream_id (stream_id) + SELECT COALESCE(MAX(stream_id), 0) FROM device_inbox; |