diff --git a/CHANGES.rst b/CHANGES.rst
index c40a32abd6..23be6c8efa 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,10 +1,17 @@
+Changes in synapse v0.17.3 (2016-09-09)
+=======================================
+
+This release fixes a major bug that stopped servers from handling rooms with
+over 1000 members.
+
+
Changes in synapse v0.17.2 (2016-09-08)
=======================================
This release contains security bug fixes. Please upgrade.
-No changes since v0.17.2
+No changes since v0.17.2-rc1
Changes in synapse v0.17.2-rc1 (2016-09-05)
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 523deaa5ff..b778cd65c9 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.17.2"
+__version__ = "0.17.3"
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 633c79c352..1ac569b305 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -170,43 +170,68 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def _attempt_new_transaction(self, destination):
- yield run_on_reactor()
- while True:
- # list of (pending_pdu, deferred, order)
- if destination in self.pending_transactions:
- # XXX: pending_transactions can get stuck on by a never-ending
- # request at which point pending_pdus_by_dest just keeps growing.
- # we need application-layer timeouts of some flavour of these
- # requests
- logger.debug(
- "TX [%s] Transaction already in progress",
- destination
- )
- return
+ # list of (pending_pdu, deferred, order)
+ if destination in self.pending_transactions:
+ # XXX: pending_transactions can get stuck on by a never-ending
+ # request at which point pending_pdus_by_dest just keeps growing.
+ # we need application-layer timeouts of some flavour of these
+ # requests
+ logger.debug(
+ "TX [%s] Transaction already in progress",
+ destination
+ )
+ return
- pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
- pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ try:
+ self.pending_transactions[destination] = 1
- device_message_edus, device_stream_id = (
- yield self._get_new_device_messages(destination)
- )
+ yield run_on_reactor()
- pending_edus.extend(device_message_edus)
+ while True:
+ pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+ pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_failures = self.pending_failures_by_dest.pop(destination, [])
- if pending_pdus:
- logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
- destination, len(pending_pdus))
+ limiter = yield get_retry_limiter(
+ destination,
+ self.clock,
+ self.store,
+ )
- if not pending_pdus and not pending_edus and not pending_failures:
- logger.debug("TX [%s] Nothing to send", destination)
- return
+ device_message_edus, device_stream_id = (
+ yield self._get_new_device_messages(destination)
+ )
- yield self._send_new_transaction(
- destination, pending_pdus, pending_edus, pending_failures,
- device_stream_id,
- should_delete_from_device_stream=bool(device_message_edus)
+ pending_edus.extend(device_message_edus)
+
+ if pending_pdus:
+ logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+ destination, len(pending_pdus))
+
+ if not pending_pdus and not pending_edus and not pending_failures:
+ logger.debug("TX [%s] Nothing to send", destination)
+ self.last_device_stream_id_by_dest[destination] = (
+ device_stream_id
+ )
+ return
+
+ success = yield self._send_new_transaction(
+ destination, pending_pdus, pending_edus, pending_failures,
+ device_stream_id,
+ should_delete_from_device_stream=bool(device_message_edus),
+ limiter=limiter,
+ )
+ if not success:
+ break
+ except NotRetryingDestination:
+ logger.info(
+ "TX [%s] not ready for retry yet - "
+ "dropping transaction for now",
+ destination,
)
+ finally:
+ # We want to be *very* sure we delete this after we stop processing
+ self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
@@ -230,151 +255,142 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures, device_stream_id,
- should_delete_from_device_stream):
+ should_delete_from_device_stream, limiter):
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
- failures = [x.get_dict() for x in pending_failures]
+ # Sort based on the order field
+ pending_pdus.sort(key=lambda t: t[1])
+ pdus = [x[0] for x in pending_pdus]
+ edus = pending_edus
+ failures = [x.get_dict() for x in pending_failures]
- try:
- self.pending_transactions[destination] = 1
+ success = True
- logger.debug("TX [%s] _attempt_new_transaction", destination)
+ try:
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
- txn_id = str(self._next_txn_id)
+ txn_id = str(self._next_txn_id)
- limiter = yield get_retry_limiter(
- destination,
- self.clock,
- self.store,
- )
+ logger.debug(
+ "TX [%s] {%s} Attempting new transaction"
+ " (pdus: %d, edus: %d, failures: %d)",
+ destination, txn_id,
+ len(pdus),
+ len(edus),
+ len(failures)
+ )
- logger.debug(
- "TX [%s] {%s} Attempting new transaction"
- " (pdus: %d, edus: %d, failures: %d)",
- destination, txn_id,
- len(pdus),
- len(edus),
- len(failures)
- )
+ logger.debug("TX [%s] Persisting transaction...", destination)
+
+ transaction = Transaction.create_new(
+ origin_server_ts=int(self.clock.time_msec()),
+ transaction_id=txn_id,
+ origin=self.server_name,
+ destination=destination,
+ pdus=pdus,
+ edus=edus,
+ pdu_failures=failures,
+ )
- logger.debug("TX [%s] Persisting transaction...", destination)
+ self._next_txn_id += 1
- transaction = Transaction.create_new(
- origin_server_ts=int(self.clock.time_msec()),
- transaction_id=txn_id,
- origin=self.server_name,
- destination=destination,
- pdus=pdus,
- edus=edus,
- pdu_failures=failures,
- )
+ yield self.transaction_actions.prepare_to_send(transaction)
- self._next_txn_id += 1
+ logger.debug("TX [%s] Persisted transaction", destination)
+ logger.info(
+ "TX [%s] {%s} Sending transaction [%s],"
+ " (PDUs: %d, EDUs: %d, failures: %d)",
+ destination, txn_id,
+ transaction.transaction_id,
+ len(pdus),
+ len(edus),
+ len(failures),
+ )
- yield self.transaction_actions.prepare_to_send(transaction)
+ with limiter:
+ # Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def json_data_cb():
+ data = transaction.get_dict()
+ now = int(self.clock.time_msec())
+ if "pdus" in data:
+ for p in data["pdus"]:
+ if "age_ts" in p:
+ unsigned = p.setdefault("unsigned", {})
+ unsigned["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
+ return data
+
+ try:
+ response = yield self.transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+
+ if response:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "Transaction returned error for %s: %s",
+ e_id, r,
+ )
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
- logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
- "TX [%s] {%s} Sending transaction [%s],"
- " (PDUs: %d, EDUs: %d, failures: %d)",
- destination, txn_id,
- transaction.transaction_id,
- len(pdus),
- len(edus),
- len(failures),
+ "TX [%s] {%s} got %d response",
+ destination, txn_id, code
)
- with limiter:
- # Actually send the transaction
-
- # FIXME (erikj): This is a bit of a hack to make the Pdu age
- # keys work
- def json_data_cb():
- data = transaction.get_dict()
- now = int(self.clock.time_msec())
- if "pdus" in data:
- for p in data["pdus"]:
- if "age_ts" in p:
- unsigned = p.setdefault("unsigned", {})
- unsigned["age"] = now - int(p["age_ts"])
- del p["age_ts"]
- return data
-
- try:
- response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
- code = 200
-
- if response:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "Transaction returned error for %s: %s",
- e_id, r,
- )
- except HttpResponseException as e:
- code = e.code
- response = e.response
+ logger.debug("TX [%s] Sent transaction", destination)
+ logger.debug("TX [%s] Marking as delivered...", destination)
- logger.info(
- "TX [%s] {%s} got %d response",
- destination, txn_id, code
- )
-
- logger.debug("TX [%s] Sent transaction", destination)
- logger.debug("TX [%s] Marking as delivered...", destination)
+ yield self.transaction_actions.delivered(
+ transaction, code, response
+ )
- yield self.transaction_actions.delivered(
- transaction, code, response
- )
+ logger.debug("TX [%s] Marked as delivered", destination)
- logger.debug("TX [%s] Marked as delivered", destination)
+ if code != 200:
+ for p in pdus:
+ logger.info(
+ "Failed to send event %s to %s", p.event_id, destination
+ )
+ success = False
+ else:
+ # Remove the acknowledged device messages from the database
+ if should_delete_from_device_stream:
+ yield self.store.delete_device_msgs_for_remote(
+ destination, device_stream_id
+ )
+ self.last_device_stream_id_by_dest[destination] = device_stream_id
+ except RuntimeError as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.warn(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
- if code != 200:
- for p in pdus:
- logger.info(
- "Failed to send event %s to %s", p.event_id, destination
- )
- else:
- # Remove the acknowledged device messages from the database
- if should_delete_from_device_stream:
- yield self.store.delete_device_msgs_for_remote(
- destination, device_stream_id
- )
- self.last_device_stream_id_by_dest[destination] = device_stream_id
- except NotRetryingDestination:
- logger.info(
- "TX [%s] not ready for retry yet - "
- "dropping transaction for now",
- destination,
- )
- except RuntimeError as e:
- # We capture this here as there as nothing actually listens
- # for this finishing functions deferred.
- logger.warn(
- "TX [%s] Problem in _attempt_transaction: %s",
- destination,
- e,
- )
+ success = False
+
+ for p in pdus:
+ logger.info("Failed to send event %s to %s", p.event_id, destination)
+ except Exception as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.warn(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
- for p in pdus:
- logger.info("Failed to send event %s to %s", p.event_id, destination)
- except Exception as e:
- # We capture this here as there as nothing actually listens
- # for this finishing functions deferred.
- logger.warn(
- "TX [%s] Problem in _attempt_transaction: %s",
- destination,
- e,
- )
+ success = False
- for p in pdus:
- logger.info("Failed to send event %s to %s", p.event_id, destination)
+ for p in pdus:
+ logger.info("Failed to send event %s to %s", p.event_id, destination)
- finally:
- # We want to be *very* sure we delete this after we stop processing
- self.pending_transactions.pop(destination, None)
+ defer.returnValue(success)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7a3c16a8aa..16dbddee03 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -265,6 +265,12 @@ class PresenceHandler(object):
to_notify = {} # Changes we want to notify everyone about
to_federation_ping = {} # These need sending keep-alives
+ # Only bother handling the last presence change for each user
+ new_states_dict = {}
+ for new_state in new_states:
+ new_states_dict[new_state.user_id] = new_state
+ new_state = new_states_dict.values()
+
for new_state in new_states:
user_id = new_state.user_id
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 1ed9034bcb..857bc9795c 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -181,7 +181,7 @@ class ReplicationResource(Resource):
def replicate(self, request_streams, limit):
writer = _Writer()
current_token = yield self.current_replication_token()
- logger.info("Replicating up to %r", current_token)
+ logger.debug("Replicating up to %r", current_token)
yield self.account_data(writer, current_token, limit, request_streams)
yield self.events(writer, current_token, limit, request_streams)
@@ -195,7 +195,7 @@ class ReplicationResource(Resource):
yield self.to_device(writer, current_token, limit, request_streams)
self.streams(writer, current_token, request_streams)
- logger.info("Replicated %d rows", writer.total)
+ logger.debug("Replicated %d rows", writer.total)
defer.returnValue(writer.finish())
def streams(self, writer, current_token, request_streams):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 251078ba57..3bfd5e8213 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -23,7 +23,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
- db_conn, "device_inbox", "stream_id",
+ db_conn, "device_max_stream_id", "stream_id",
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3c933f1620..22d6a7d31e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -22,7 +22,7 @@ from synapse.streams.config import PaginationConfig
from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import Filter
from synapse.types import UserID, RoomID, RoomAlias
-from synapse.events.utils import serialize_event
+from synapse.events.utils import serialize_event, format_event_for_client_v2
from synapse.http.servlet import parse_json_object_from_request, parse_string
import logging
@@ -120,6 +120,8 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_type, state_key):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+ format = parse_string(request, "format", default="content",
+ allowed_values=["content", "event"])
msg_handler = self.handlers.message_handler
data = yield msg_handler.get_room_data(
@@ -134,7 +136,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
raise SynapseError(
404, "Event not found.", errcode=Codes.NOT_FOUND
)
- defer.returnValue((200, data.get_dict()["content"]))
+
+ if format == "event":
+ event = format_event_for_client_v2(data.get_dict())
+ defer.returnValue((200, event))
+ elif format == "content":
+ defer.returnValue((200, data.get_dict()["content"]))
@defer.inlineCallbacks
def on_PUT(self, request, room_id, event_type, state_key, txn_id=None):
diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py
index f1a48acf07..fd2a3d69d4 100644
--- a/synapse/rest/client/v2_alpha/notifications.py
+++ b/synapse/rest/client/v2_alpha/notifications.py
@@ -45,11 +45,12 @@ class NotificationsServlet(RestServlet):
from_token = parse_string(request, "from", required=False)
limit = parse_integer(request, "limit", default=50)
+ only = parse_string(request, "only", required=False)
limit = min(limit, 500)
push_actions = yield self.store.get_push_actions_for_user(
- user_id, from_token, limit
+ user_id, from_token, limit, only_highlight=(only == "highlight")
)
receipts_by_room = yield self.store.get_receipts_for_user_with_orderings(
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 828e5ca60b..a61e83d5de 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -111,7 +111,7 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "presence_stream", "stream_id"
)
self._device_inbox_id_gen = StreamIdGenerator(
- db_conn, "device_inbox", "stream_id"
+ db_conn, "device_max_stream_id", "stream_id"
)
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 658fbef27b..b729b7106e 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -130,6 +130,13 @@ class DeviceInboxStore(SQLBaseStore):
def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
messages_by_user_then_device):
+ sql = (
+ "UPDATE device_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(sql, (stream_id, stream_id))
+
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
messages_json_for_user = {}
@@ -148,6 +155,8 @@ class DeviceInboxStore(SQLBaseStore):
device = row[0]
messages_json_for_user[device] = message_json
else:
+ if not devices:
+ continue
sql = (
"SELECT device_id FROM devices"
" WHERE user_id = ? AND device_id IN ("
@@ -164,7 +173,11 @@ class DeviceInboxStore(SQLBaseStore):
message_json = ujson.dumps(messages_by_device[device])
messages_json_for_user[device] = message_json
- local_by_user_then_device[user_id] = messages_json_for_user
+ if messages_json_for_user:
+ local_by_user_then_device[user_id] = messages_json_for_user
+
+ if not local_by_user_then_device:
+ return
sql = (
"INSERT INTO device_inbox"
@@ -301,7 +314,7 @@ class DeviceInboxStore(SQLBaseStore):
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
)
- if not has_changed:
+ if not has_changed or last_stream_id == current_stream_id:
return defer.succeed(([], current_stream_id))
def get_new_messages_for_remote_destination_txn(txn):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index eb15fb751b..10e9305f7b 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -338,14 +338,21 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue(notifs[:limit])
@defer.inlineCallbacks
- def get_push_actions_for_user(self, user_id, before=None, limit=50):
+ def get_push_actions_for_user(self, user_id, before=None, limit=50,
+ only_highlight=False):
def f(txn):
before_clause = ""
if before:
- before_clause = "AND stream_ordering < ?"
+ before_clause = "AND epa.stream_ordering < ?"
args = [user_id, before, limit]
else:
args = [user_id, limit]
+
+ if only_highlight:
+ if len(before_clause) > 0:
+ before_clause += " "
+ before_clause += "AND epa.highlight = 1"
+
sql = (
"SELECT epa.event_id, epa.room_id,"
" epa.stream_ordering, epa.topological_ordering,"
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 6ab10db328..866d64e679 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -402,7 +402,7 @@ class RoomMemberStore(SQLBaseStore):
keyvalues={
"membership": Membership.JOIN,
},
- batch_size=1000,
+ batch_size=500,
desc="_get_joined_users_from_context",
)
diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/35/device_outbox.sql
index e87066d9a1..17e6c43105 100644
--- a/synapse/storage/schema/delta/34/device_outbox.sql
+++ b/synapse/storage/schema/delta/35/device_outbox.sql
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+DROP TABLE IF EXISTS device_federation_outbox;
CREATE TABLE device_federation_outbox (
destination TEXT NOT NULL,
stream_id BIGINT NOT NULL,
@@ -21,16 +22,18 @@ CREATE TABLE device_federation_outbox (
);
+DROP INDEX IF EXISTS device_federation_outbox_destination_id;
CREATE INDEX device_federation_outbox_destination_id
ON device_federation_outbox(destination, stream_id);
+DROP TABLE IF EXISTS device_federation_inbox;
CREATE TABLE device_federation_inbox (
origin TEXT NOT NULL,
message_id TEXT NOT NULL,
received_ts BIGINT NOT NULL
);
-
+DROP INDEX IF EXISTS device_federation_inbox_sender_id;
CREATE INDEX device_federation_inbox_sender_id
ON device_federation_inbox(origin, message_id);
diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql
new file mode 100644
index 0000000000..7ab7d942e2
--- /dev/null
+++ b/synapse/storage/schema/delta/35/device_stream_id.sql
@@ -0,0 +1,21 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE device_max_stream_id (
+ stream_id BIGINT NOT NULL
+);
+
+INSERT INTO device_max_stream_id (stream_id)
+ SELECT COALESCE(MAX(stream_id), 0) FROM device_inbox;
|