2424import java .io .*;
2525import java .nio .BufferOverflowException ;
2626import java .nio .ByteBuffer ;
27+ import java .nio .channels .ClosedChannelException ;
2728import java .nio .channels .FileChannel ;
2829import java .nio .channels .SeekableByteChannel ;
2930import java .nio .file .Files ;
@@ -219,7 +220,7 @@ public final class Journal implements Closeable {
219220 * the current output channel
220221 * Only valid after switchFiles() was called at least once!
221222 */
222- @ GuardedBy ("this" ) private FileChannel channel ;
223+ @ GuardedBy ("this" ) private JournalChannel channel ;
223224
224225 /**
225226 * the current journal file number
@@ -345,14 +346,10 @@ public synchronized void writeToLog(final Loggable entry) throws JournalExceptio
345346 flushToLog (false );
346347 }
347348
348- try {
349- // TODO(AR) this is needed as the journal is initialised by starting a transaction for loading the SymbolTable... before recovery! which is likely wrong!!! as Recovery Cannot run if the Journal file has been switched!
350- final long pos = channel != null ? channel .position () : 0 ;
349+ // TODO(AR) this is needed as the journal is initialised by starting a transaction for loading the SymbolTable... before recovery! which is likely wrong!!! as Recovery Cannot run if the Journal file has been switched!
350+ final long pos = channel != null ? channel .position () : 0 ;
351351
352- currentLsn = new Lsn (currentJournalFileNumber , pos + currentBuffer .position () + 1 );
353- } catch (final IOException e ) {
354- throw new JournalException ("Unable to create LSN for: " + entry .dump ());
355- }
352+ currentLsn = new Lsn (currentJournalFileNumber , pos + currentBuffer .position () + 1 );
356353 entry .setLsn (currentLsn );
357354
358355 try {
@@ -481,21 +478,18 @@ public synchronized void checkpoint(final long txnId, final boolean switchLogFil
481478 } else {
482479 flushToLog (true , true );
483480 }
484- try {
485- if (switchLogFiles && channel != null && channel .position () > journalSizeMin ) {
486- final Path oldFile = getFile (currentJournalFileNumber );
487- final RemoveRunnable removeRunnable = new RemoveRunnable (channel , oldFile ); // takes ownership of channel and oldFile when `start` is called
488- try {
489- switchFiles ();
490- } catch (final LogException e ) {
491- LOG .warn ("Failed to create new journal: {}" , e .getMessage (), e );
492- }
493481
494- final Thread removeThread = newInstanceThread (pool , "remove-journal" , removeRunnable );
495- removeThread .start ();
482+ if (switchLogFiles && channel != null && channel .position () > journalSizeMin ) {
483+ final Path oldFile = getFile (currentJournalFileNumber );
484+ final RemoveRunnable removeRunnable = new RemoveRunnable (channel , oldFile ); // takes ownership of channel and oldFile when `start` is called
485+ try {
486+ switchFiles ();
487+ } catch (final LogException e ) {
488+ LOG .warn ("Failed to create new journal: {}" , e .getMessage (), e );
496489 }
497- } catch (final IOException e ) {
498- LOG .warn ("IOException while writing checkpoint" , e );
490+
491+ final Thread removeThread = newInstanceThread (pool , "remove-journal" , removeRunnable );
492+ removeThread .start ();
499493 }
500494 }
501495
@@ -576,7 +570,7 @@ public synchronized void switchFiles() throws LogException {
576570 close ();
577571
578572 // open new journal file
579- channel = ( FileChannel ) Files .newByteChannel (newJournalFile , CREATE_NEW , WRITE );
573+ channel = new JournalChannel (( FileChannel ) Files .newByteChannel (newJournalFile , CREATE_NEW , WRITE ) );
580574 writeJournalHeader (channel );
581575 initialised = true ;
582576 currentJournalFileNumber = newJournalFileNumber ;
@@ -585,7 +579,7 @@ public synchronized void switchFiles() throws LogException {
585579 }
586580 }
587581
588- static void writeJournalHeader (final SeekableByteChannel channel ) throws IOException {
582+ static void writeJournalHeader (final JournalChannel channel ) throws IOException {
589583 final ByteBuffer buf = ByteBuffer .allocateDirect (JOURNAL_HEADER_LEN );
590584
591585 // write the magic number
@@ -775,10 +769,10 @@ static String getFileName(final short fileNum) throws IllegalArgumentException {
775769 }
776770
777771 private static class RemoveRunnable implements Runnable {
778- private final SeekableByteChannel channel ;
772+ private final JournalChannel channel ;
779773 private final Path path ;
780774
781- RemoveRunnable (final SeekableByteChannel channel , final Path path ) {
775+ RemoveRunnable (final JournalChannel channel , final Path path ) {
782776 this .channel = channel ;
783777 this .path = path ;
784778 }
@@ -795,4 +789,50 @@ public void run() {
795789 FileUtils .deleteQuietly (path );
796790 }
797791 }
792+
793+ /**
794+ * Wrap journal write channel so that operations are clearly restricted to serial writes
795+ */
796+ static class JournalChannel {
797+ private SeekableByteChannel channel ;
798+ private long position = -1 ;
799+
800+ JournalChannel (final SeekableByteChannel channel ) {
801+ this .channel = channel ;
802+ try {
803+ this .position = channel .position ();
804+ } catch (IOException e ) {
805+ throw new RuntimeException ("Journal created with closed channel " , e );
806+ }
807+ }
808+
809+ long size () throws IOException {
810+ return channel .size ();
811+ }
812+
813+ long position () {
814+ return position ;
815+ }
816+
817+ void force (final boolean metaData ) throws IOException {
818+ ((FileChannel )channel ).force (metaData );
819+ }
820+
821+ int write (final ByteBuffer src ) throws IOException {
822+ int bytesWritten = channel .write (src );
823+ position += bytesWritten ;
824+ return bytesWritten ;
825+ }
826+
827+ void close () throws IOException {
828+ try {
829+ if (channel != null ) {
830+ channel .close ();
831+ }
832+ } finally {
833+ channel = null ;
834+ position = -1 ;
835+ }
836+ }
837+ }
798838}
0 commit comments