Skip to content

Commit 2c449c5

Browse files
committed
Execute no SQL when all events are rejected
1 parent 7a03a1b commit 2c449c5

2 files changed

Lines changed: 97 additions & 7 deletions

File tree

core/src/main/scala/akka/persistence/journal/sqlasync/ScalikeJDBCWriteJournal.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ package akka.persistence.journal.sqlasync
33
import akka.persistence.common.StoragePlugin
44
import akka.persistence.journal.AsyncWriteJournal
55
import akka.persistence.{AtomicWrite, PersistentRepr}
6-
import scalikejdbc._
7-
import scalikejdbc.async._
8-
96
import scala.collection.immutable
107
import scala.concurrent.Future
11-
import scala.util.{Success, Try}
8+
import scala.util.{Failure, Success, Try}
9+
import scalikejdbc._
10+
import scalikejdbc.async._
1211

1312
private[sqlasync] trait ScalikeJDBCWriteJournal extends AsyncWriteJournal with StoragePlugin {
1413
private[this] lazy val journalTable = {
@@ -51,8 +50,19 @@ private[sqlasync] trait ScalikeJDBCWriteJournal extends AsyncWriteJournal with S
5150
}
5251
(batch, result) = serialize(keys)
5352
sql = sql"INSERT INTO $journalTable (persistence_key, sequence_nr, message) VALUES $batch"
54-
_ <- logging(sql).update().future()
55-
} yield result
53+
_ <- if (result.forall(_.isFailure)) {
54+
// No insertion is needed.
55+
Future.successful(())
56+
} else {
57+
logging(sql).update().future()
58+
}
59+
} yield {
60+
result.foreach {
61+
case Success(_) =>
62+
case Failure(e) => log.warning("Failed serializing an event.", e)
63+
}
64+
result
65+
}
5666
}
5767
}
5868
}
Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,89 @@
11
package akka.persistence.journal.sqlasync
22

3+
import akka.actor.Actor
4+
import akka.persistence.JournalProtocol.{WriteMessageRejected, WriteMessageSuccess, WriteMessages, WriteMessagesSuccessful}
35
import akka.persistence.helper.MySQLInitializer
46
import akka.persistence.journal.JournalSpec
7+
import akka.persistence.{AtomicWrite, PersistentImpl, PersistentRepr}
8+
import akka.testkit.TestProbe
59
import com.typesafe.config.ConfigFactory
10+
import java.io.NotSerializableException
11+
import scala.concurrent.duration._
612

713
class MySQLAsyncJournalSpec
814
extends JournalSpec(ConfigFactory.load("mysql-application.conf"))
9-
with MySQLInitializer
15+
with MySQLInitializer {
16+
17+
"ScalikeJDBCWriteJournal" must {
18+
"not execute SQL when all the events is not serializable" in {
19+
val probe = TestProbe()
20+
21+
val notSerializableEvent = new Object { override def toString = "not serializable" }
22+
val messages = (6 to 8).map { i =>
23+
AtomicWrite(PersistentRepr(
24+
payload = notSerializableEvent,
25+
sequenceNr = i,
26+
persistenceId = pid,
27+
sender = Actor.noSender,
28+
writerUuid = writerUuid
29+
))
30+
}
31+
journal ! WriteMessages(messages, probe.ref, actorInstanceId)
32+
probe.expectMsg(WriteMessagesSuccessful)
33+
34+
val Pid = pid
35+
val WriterUuid = writerUuid
36+
probe.expectMsgPF() {
37+
case WriteMessageRejected(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), cause, _) =>
38+
payload should be(notSerializableEvent)
39+
cause.isInstanceOf[NotSerializableException] should be(true)
40+
}
41+
probe.expectMsgPF() {
42+
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), cause, _) =>
43+
payload should be(notSerializableEvent)
44+
cause.isInstanceOf[NotSerializableException] should be(true)
45+
}
46+
probe.expectMsgPF() {
47+
case WriteMessageRejected(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), cause, _) =>
48+
payload should be(notSerializableEvent)
49+
cause.isInstanceOf[NotSerializableException] should be(true)
50+
}
51+
probe.expectNoMsg(1.second)
52+
}
53+
54+
"handle partial serialization errors" in {
55+
val probe = TestProbe()
56+
57+
val notSerializableEvent = new Object { override def toString = "not serializable" }
58+
val messages = (6 to 8).map { i =>
59+
val event = if (i == 7) notSerializableEvent else s"b-$i"
60+
AtomicWrite(PersistentRepr(
61+
payload = event,
62+
sequenceNr = i,
63+
persistenceId = pid,
64+
sender = Actor.noSender,
65+
writerUuid = writerUuid
66+
))
67+
}
68+
journal ! WriteMessages(messages, probe.ref, actorInstanceId)
69+
probe.expectMsg(WriteMessagesSuccessful)
70+
71+
val Pid = pid
72+
val WriterUuid = writerUuid
73+
probe.expectMsgPF() {
74+
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) =>
75+
payload should be("b-6")
76+
}
77+
probe.expectMsgPF() {
78+
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), cause, _) =>
79+
payload should be(notSerializableEvent)
80+
cause.isInstanceOf[NotSerializableException] should be(true)
81+
}
82+
probe.expectMsgPF() {
83+
case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), _) =>
84+
payload should be("b-8")
85+
}
86+
probe.expectNoMsg(1.second)
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)