|
4 | 4 | using System.Buffers; |
5 | 5 | using System.Collections.Generic; |
6 | 6 | using System.Diagnostics; |
| 7 | +using System.Diagnostics.CodeAnalysis; |
7 | 8 | using System.IO; |
8 | 9 | using System.IO.Pipelines; |
9 | 10 | using System.Linq; |
|
16 | 17 | using System.Text; |
17 | 18 | using System.Threading; |
18 | 19 | using System.Threading.Tasks; |
| 20 | +using static StackExchange.Redis.Message; |
19 | 21 |
|
20 | 22 | namespace StackExchange.Redis |
21 | 23 | { |
@@ -396,9 +398,8 @@ public void RecordConnectionFailed( |
396 | 398 | lock (_writtenAwaitingResponse) |
397 | 399 | { |
398 | 400 | // find oldest message awaiting a response |
399 | | - if (_writtenAwaitingResponse.Count != 0) |
| 401 | + if (_writtenAwaitingResponse.TryPeek(out var next)) |
400 | 402 | { |
401 | | - var next = _writtenAwaitingResponse.Peek(); |
402 | 403 | unansweredWriteTime = next.GetWriteTime(); |
403 | 404 | } |
404 | 405 | } |
@@ -478,34 +479,42 @@ void add(string lk, string sk, string? v) |
478 | 479 | bridge?.OnConnectionFailed(this, failureType, outerException); |
479 | 480 | } |
480 | 481 | } |
481 | | - // cleanup |
| 482 | + // clean up (note: avoid holding the lock when we complete things, even if this means taking |
| 483 | + // the lock multiple times; this is fine here - we shouldn't be fighting anyone, and we're already toast) |
482 | 484 | lock (_writtenAwaitingResponse) |
483 | 485 | { |
484 | 486 | bridge?.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count); |
485 | | - while (_writtenAwaitingResponse.Count != 0) |
486 | | - { |
487 | | - var next = _writtenAwaitingResponse.Dequeue(); |
| 487 | + } |
488 | 488 |
|
489 | | - if (next.Command == RedisCommand.QUIT && next.TrySetResult(true)) |
490 | | - { |
491 | | - // fine, death of a socket is close enough |
492 | | - next.Complete(); |
493 | | - } |
494 | | - else |
| 489 | + while (TryDequeueLocked(_writtenAwaitingResponse, out var next)) |
| 490 | + { |
| 491 | + if (next.Command == RedisCommand.QUIT && next.TrySetResult(true)) |
| 492 | + { |
| 493 | + // fine, death of a socket is close enough |
| 494 | + next.Complete(); |
| 495 | + } |
| 496 | + else |
| 497 | + { |
| 498 | + var ex = innerException is RedisException ? innerException : outerException; |
| 499 | + if (bridge != null) |
495 | 500 | { |
496 | | - var ex = innerException is RedisException ? innerException : outerException; |
497 | | - if (bridge != null) |
498 | | - { |
499 | | - bridge.Trace("Failing: " + next); |
500 | | - bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); |
501 | | - } |
502 | | - next.SetExceptionAndComplete(ex!, bridge); |
| 501 | + bridge.Trace("Failing: " + next); |
| 502 | + bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); |
503 | 503 | } |
| 504 | + next.SetExceptionAndComplete(ex!, bridge); |
504 | 505 | } |
505 | 506 | } |
506 | 507 |
|
507 | 508 | // burn the socket |
508 | 509 | Shutdown(); |
| 510 | + |
| 511 | + static bool TryDequeueLocked(Queue<Message> queue, [NotNullWhen(true)] out Message? message) |
| 512 | + { |
| 513 | + lock (queue) |
| 514 | + { |
| 515 | + return queue.TryDequeue(out message); |
| 516 | + } |
| 517 | + } |
509 | 518 | } |
510 | 519 |
|
511 | 520 | internal bool IsIdle() => _writeStatus == WriteStatus.Idle; |
@@ -1580,18 +1589,10 @@ private void MatchResult(in RawResult result) |
1580 | 1589 | _readStatus = ReadStatus.DequeueResult; |
1581 | 1590 | lock (_writtenAwaitingResponse) |
1582 | 1591 | { |
1583 | | -#if NET5_0_OR_GREATER |
1584 | 1592 | if (!_writtenAwaitingResponse.TryDequeue(out msg)) |
1585 | 1593 | { |
1586 | 1594 | throw new InvalidOperationException("Received response with no message waiting: " + result.ToString()); |
1587 | 1595 | } |
1588 | | -#else |
1589 | | - if (_writtenAwaitingResponse.Count == 0) |
1590 | | - { |
1591 | | - throw new InvalidOperationException("Received response with no message waiting: " + result.ToString()); |
1592 | | - } |
1593 | | - msg = _writtenAwaitingResponse.Dequeue(); |
1594 | | -#endif |
1595 | 1596 | } |
1596 | 1597 | _activeMessage = msg; |
1597 | 1598 |
|
@@ -1632,9 +1633,23 @@ static bool TryGetPubSubPayload(in RawResult value, out RedisValue parsed, bool |
1632 | 1633 | internal void GetHeadMessages(out Message? now, out Message? next) |
1633 | 1634 | { |
1634 | 1635 | now = _activeMessage; |
1635 | | - lock(_writtenAwaitingResponse) |
| 1636 | + bool haveLock = false; |
| 1637 | + try |
| 1638 | + { |
| 1639 | + // careful locking here; a: don't try too hard (this is error info only), b: avoid deadlock (see #2376) |
| 1640 | + Monitor.TryEnter(_writtenAwaitingResponse, 10, ref haveLock); |
| 1641 | + if (haveLock) |
| 1642 | + { |
| 1643 | + _writtenAwaitingResponse.TryPeek(out next); |
| 1644 | + } |
| 1645 | + else |
| 1646 | + { |
| 1647 | + next = UnknownMessage.Instance; |
| 1648 | + } |
| 1649 | + } |
| 1650 | + finally |
1636 | 1651 | { |
1637 | | - next = _writtenAwaitingResponse.Count == 0 ? null : _writtenAwaitingResponse.Peek(); |
| 1652 | + if (haveLock) Monitor.Exit(_writtenAwaitingResponse); |
1638 | 1653 | } |
1639 | 1654 | } |
1640 | 1655 |
|
|
0 commit comments