diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 3ba5335af7..ba49075a20 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -85,6 +85,12 @@ class RoomMemberHandler(BaseHandler):
prev_event_ids=prev_event_ids,
)
+ # Check if this event matches the previous membership event for the user.
+ duplicate = yield msg_handler.deduplicate_state_event(event, context)
+ if duplicate is not None:
+ # Discard the new event since this membership change is a no-op.
+ return
+
yield msg_handler.handle_new_client_event(
requester,
event,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 14f2032afa..b5962f4f5a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -576,7 +576,7 @@ class SyncHandler(object):
logger.debug("Getting messages up to %d", now_token.to_device_key)
messages, stream_id = yield self.store.get_new_messages_for_device(
- user_id, device_id, now_token.to_device_key
+ user_id, device_id, since_stream_id, now_token.to_device_key
)
logger.debug("Got messages up to %d: %r", stream_id, messages)
sync_result_builder.now_token = now_token.copy_and_replace(
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 2fa0a218b9..68116b0394 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -85,7 +85,7 @@ class DeviceInboxStore(SQLBaseStore):
defer.returnValue(self._device_inbox_id_gen.get_current_token())
def get_new_messages_for_device(
- self, user_id, device_id, current_stream_id, limit=100
+ self, user_id, device_id, last_stream_id, current_stream_id, limit=100
):
"""
Args:
@@ -101,11 +101,13 @@ class DeviceInboxStore(SQLBaseStore):
sql = (
"SELECT stream_id, message_json FROM device_inbox"
" WHERE user_id = ? AND device_id = ?"
- " AND stream_id <= ?"
+ " AND ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
" LIMIT ?"
)
- txn.execute(sql, (user_id, device_id, current_stream_id, limit))
+ txn.execute(sql, (
+ user_id, device_id, last_stream_id, current_stream_id, limit
+ ))
messages = []
for row in txn.fetchall():
stream_pos = row[0]
|