Skip to content

Commit f90c6ac

Browse files
fclairambclaude
andauthored
feat(deflate): Adding support for transfer compression (#461)
Add MODE Z (deflate) transfer compression support per RFC-compliant FTP extensions. Key features: - Implement deflate compression for data transfers using compress/flate - Add TransferFinalizer interface to properly terminate compressed streams - Advertise MODE Z in FEAT response for client discovery - Disallow REST (resume) in deflate mode (streaming format limitation) - Handle closed connection errors gracefully Implementation details: - deflateReadWriter wraps connections with compression/decompression - TransferMode enum (Stream=0, Deflate=1) tracks current mode per client - MODE command accepts S (stream) and Z (deflate) parameters - Proper BFINAL block written on stream close for valid decompression Test coverage: - Upload and download with deflate compression - Mode switching between stream and deflate - Error paths for invalid compression levels - Unit tests for helper functions Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5d78269 commit f90c6ac

13 files changed

+685
-28
lines changed

client_handler.go

Lines changed: 139 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ftpserver
22

33
import (
44
"bufio"
5+
"compress/flate"
56
"errors"
67
"fmt"
78
"io"
@@ -33,6 +34,17 @@ const (
3334
TransferTypeBinary
3435
)
3536

37+
// TransferMode is the enumerable that represents the transfer mode (stream, block, compressed, deflate)
38+
type TransferMode int8
39+
40+
// Transfer modes
41+
const (
42+
// TransferModeStream is the standard uncompressed transfer mode
43+
TransferModeStream TransferMode = iota
44+
// TransferModeDeflate is the compressed transfer mode using deflate algorithm
45+
TransferModeDeflate
46+
)
47+
3648
// DataChannel is the enumerable that represents the data channel (active or passive)
3749
type DataChannel int8
3850

@@ -103,6 +115,7 @@ type clientHandler struct {
103115
debug bool // Show debugging info on the server side
104116
transferTLS bool // Use TLS for transfer connection
105117
controlTLS bool // Use TLS for control connection
118+
transferMode TransferMode // Transfer mode (stream, deflate)
106119
isTransferOpen bool // indicate if the transfer connection is opened
107120
isTransferAborted bool // indicate if the transfer was aborted
108121
connClosed bool // indicates if the connection has been commanded to close
@@ -660,7 +673,7 @@ func (c *clientHandler) GetTranferInfo() string {
660673
return c.transfer.GetInfo()
661674
}
662675

663-
func (c *clientHandler) TransferOpen(info string) (net.Conn, error) {
676+
func (c *clientHandler) TransferOpen(info string) (io.ReadWriter, error) {
664677
c.transferMu.Lock()
665678
defer c.transferMu.Unlock()
666679

@@ -696,6 +709,17 @@ func (c *clientHandler) TransferOpen(info string) (net.Conn, error) {
696709
return nil, err
697710
}
698711

712+
var transferStream io.ReadWriter = conn
713+
714+
if c.transferMode == TransferModeDeflate {
715+
transferStream, err = newDeflateTransfer(transferStream, c.server.settings.DeflateCompressionLevel)
716+
if err != nil {
717+
c.writeMessage(StatusActionNotTaken, fmt.Sprintf("Could not switch to deflate mode: %v", err))
718+
719+
return nil, fmt.Errorf("could not switch to deflate mode: %w", err)
720+
}
721+
}
722+
699723
c.isTransferOpen = true
700724
c.transfer.SetInfo(info)
701725

@@ -708,18 +732,56 @@ func (c *clientHandler) TransferOpen(info string) (net.Conn, error) {
708732
"localAddr", conn.LocalAddr().String())
709733
}
710734

711-
return conn, nil
735+
return transferStream, nil
736+
}
737+
738+
// Flusher is the interface that wraps the basic Flush method.
739+
type Flusher interface {
740+
Flush() error
741+
}
742+
743+
// TransferFinalizer is the interface for transfer streams that need explicit
744+
// finalization (e.g., deflate streams need to write end-of-stream markers).
745+
// This is distinct from closing the underlying connection.
746+
// We use FinalizeTransfer() instead of Close() to distinguish from io.Closer,
747+
// since net.Conn already implements io.Closer and we don't want to accidentally
748+
// close the underlying connection.
749+
type TransferFinalizer interface {
750+
FinalizeTransfer() error
712751
}
713752

714-
func (c *clientHandler) TransferClose(err error) {
753+
func (c *clientHandler) TransferClose(transfer io.ReadWriter, err error) {
715754
c.transferMu.Lock()
716755
defer c.transferMu.Unlock()
717756

757+
// Check if this is a transfer stream that needs explicit finalization (e.g., deflate).
758+
// TransferFinalizer.FinalizeTransfer() for deflate writes the end-of-stream marker AND flushes,
759+
// so we don't need to call Flush() separately.
760+
if finalizer, ok := transfer.(TransferFinalizer); ok {
761+
if errFinalize := finalizer.FinalizeTransfer(); errFinalize != nil {
762+
c.logger.Warn(
763+
"Error finalizing transfer stream",
764+
"err", errFinalize,
765+
)
766+
}
767+
} else if flush, ok := transfer.(Flusher); ok {
768+
// Only flush if NOT a TransferCloser (deflate's Close already flushes)
769+
if errFlush := flush.Flush(); errFlush != nil {
770+
c.logger.Warn(
771+
"Error flushing transfer connection",
772+
"err", errFlush,
773+
)
774+
}
775+
}
776+
777+
// Finally close the underlying connection.
778+
// "Use of closed network connection" is normal in FTP - the client can close
779+
// the connection when done, and we should treat this as success.
718780
errClose := c.closeTransfer()
719-
if errClose != nil {
781+
if errClose != nil && !isClosedConnError(errClose) {
720782
c.logger.Warn(
721783
"Problem closing transfer connection",
722-
"err", err,
784+
"err", errClose,
723785
)
724786
}
725787

@@ -730,6 +792,11 @@ func (c *clientHandler) TransferClose(err error) {
730792
return
731793
}
732794

795+
// Treat "connection already closed" as success - it means transfer completed
796+
if isClosedConnError(errClose) {
797+
errClose = nil
798+
}
799+
733800
switch {
734801
case err == nil && errClose == nil:
735802
c.writeMessage(StatusClosingDataConn, "Closing transfer connection")
@@ -740,6 +807,19 @@ func (c *clientHandler) TransferClose(err error) {
740807
}
741808
}
742809

810+
// isClosedConnError checks if the error indicates the connection is already closed.
811+
// This is normal FTP behavior - the client can close the connection when done.
812+
func isClosedConnError(err error) bool {
813+
if err == nil {
814+
return false
815+
}
816+
817+
errStr := err.Error()
818+
819+
return strings.Contains(errStr, "use of closed network connection") ||
820+
strings.Contains(errStr, "connection reset by peer")
821+
}
822+
743823
func (c *clientHandler) checkDataConnectionRequirement(dataConnIP net.IP, channelType DataChannel) error {
744824
var requirement DataConnectionRequirement
745825

@@ -821,3 +901,57 @@ func getMessageLines(message string) []string {
821901

822902
return lines
823903
}
904+
905+
// Compile-time checks that deflateReadWriter implements required interfaces
906+
var (
907+
_ Flusher = (*deflateReadWriter)(nil)
908+
_ TransferFinalizer = (*deflateReadWriter)(nil)
909+
)
910+
911+
type deflateReadWriter struct {
912+
reader io.ReadCloser // flate.NewReader returns io.ReadCloser
913+
writer *flate.Writer
914+
}
915+
916+
func (d *deflateReadWriter) Read(p []byte) (int, error) {
917+
return d.reader.Read(p)
918+
}
919+
920+
func (d *deflateReadWriter) Write(p []byte) (int, error) {
921+
return d.writer.Write(p)
922+
}
923+
924+
// Flush flushes buffered data to the underlying writer.
925+
func (d *deflateReadWriter) Flush() error {
926+
return d.writer.Flush()
927+
}
928+
929+
// FinalizeTransfer finalizes the deflate stream by writing the BFINAL block (end-of-stream marker).
930+
// This does NOT close the underlying connection - it only finalizes the deflate stream.
931+
func (d *deflateReadWriter) FinalizeTransfer() error {
932+
// Close the writer to write the BFINAL block
933+
if err := d.writer.Close(); err != nil {
934+
return fmt.Errorf("error closing deflate writer: %w", err)
935+
}
936+
937+
// Close the reader to release resources
938+
if err := d.reader.Close(); err != nil {
939+
return fmt.Errorf("error closing deflate reader: %w", err)
940+
}
941+
942+
return nil
943+
}
944+
945+
func newDeflateTransfer(conn io.ReadWriter, level int) (*deflateReadWriter, error) {
946+
writer, err := flate.NewWriter(conn, level)
947+
if err != nil {
948+
return nil, fmt.Errorf("could not create deflate writer: %w", err)
949+
}
950+
951+
reader := flate.NewReader(conn)
952+
953+
return &deflateReadWriter{
954+
reader: reader,
955+
writer: writer,
956+
}, nil
957+
}

client_handler_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ftpserver
22

33
import (
4+
"errors"
45
"fmt"
56
"net"
67
"slices"
@@ -490,3 +491,87 @@ func TestExtraData(t *testing.T) {
490491
require.Equal(t, k, extra)
491492
}
492493
}
494+
495+
var (
496+
errClosedConn = net.ErrClosed
497+
errConnReset = errors.New("connection reset by peer")
498+
errOther = errors.New("some other error")
499+
errWrappedClosed = fmt.Errorf("failed: %w", errClosedConn)
500+
)
501+
502+
func TestIsClosedConnError(t *testing.T) {
503+
t.Parallel()
504+
505+
tests := []struct {
506+
name string
507+
err error
508+
expected bool
509+
}{
510+
{name: "nil error", err: nil, expected: false},
511+
{name: "net.ErrClosed", err: errClosedConn, expected: true},
512+
{name: "connection reset by peer", err: errConnReset, expected: true},
513+
{name: "wrapped closed connection error", err: errWrappedClosed, expected: true},
514+
{name: "other error", err: errOther, expected: false},
515+
}
516+
517+
for _, tc := range tests {
518+
t.Run(tc.name, func(t *testing.T) {
519+
t.Parallel()
520+
result := isClosedConnError(tc.err)
521+
assert.Equal(t, tc.expected, result)
522+
})
523+
}
524+
}
525+
526+
func TestDeflateReadWriterFlush(t *testing.T) {
527+
t.Parallel()
528+
529+
// Create a buffer to write to
530+
buf := &mockReadWriter{}
531+
532+
// Create deflate transfer
533+
deflate, err := newDeflateTransfer(buf, 5)
534+
require.NoError(t, err)
535+
536+
// Write some data
537+
data := []byte("test data for deflate")
538+
n, err := deflate.Write(data)
539+
require.NoError(t, err)
540+
require.Equal(t, len(data), n)
541+
542+
// Test Flush
543+
err = deflate.Flush()
544+
require.NoError(t, err)
545+
546+
// Verify data was written to buffer
547+
require.Positive(t, buf.writeCount)
548+
}
549+
550+
func TestNewDeflateTransferInvalidLevel(t *testing.T) {
551+
t.Parallel()
552+
553+
buf := &mockReadWriter{}
554+
555+
// Test with invalid compression level (valid range is -2 to 9)
556+
_, err := newDeflateTransfer(buf, 100)
557+
require.Error(t, err)
558+
require.Contains(t, err.Error(), "could not create deflate writer")
559+
}
560+
561+
// mockReadWriter is a simple mock for testing
562+
type mockReadWriter struct {
563+
writeCount int
564+
readCount int
565+
}
566+
567+
func (m *mockReadWriter) Write(p []byte) (int, error) {
568+
m.writeCount++
569+
570+
return len(p), nil
571+
}
572+
573+
func (m *mockReadWriter) Read(_ []byte) (int, error) {
574+
m.readCount++
575+
576+
return 0, nil
577+
}

driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ type Settings struct {
323323
DisableSTAT bool // Disable Server STATUS, STAT on files and directories will still work
324324
DisableSYST bool // Disable SYST
325325
EnableCOMB bool // Enable COMB support
326+
DeflateCompressionLevel int // Deflate compression level (1-9)
326327
DefaultTransferType TransferType // Transfer type to use if the client don't send the TYPE command
327328
// ActiveConnectionsCheck defines the security requirements for active connections
328329
ActiveConnectionsCheck DataConnectionRequirement

driver_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@ func (driver *TestServerDriver) Init() {
5555
}
5656

5757
{
58-
dir, _ := os.MkdirTemp("", "example")
59-
if err := os.MkdirAll(dir, 0o750); err != nil {
58+
driver.serverDir, _ = os.MkdirTemp("", "example")
59+
if err := os.MkdirAll(driver.serverDir, 0o750); err != nil {
6060
panic(err)
6161
}
6262

63-
driver.fs = afero.NewBasePathFs(afero.NewOsFs(), dir)
63+
driver.fs = afero.NewBasePathFs(afero.NewOsFs(), driver.serverDir)
6464
}
6565
}
6666

@@ -124,6 +124,7 @@ type authUserProvider func(user, pass string) (ClientDriver, error)
124124
type TestServerDriver struct {
125125
Clients []ClientContext
126126
errPassiveListener error
127+
serverDir string
127128
fs afero.Fs
128129
clientMU sync.Mutex
129130
AuthProvider authUserProvider

errors_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestCustomErrorsCode(t *testing.T) {
2424
func TestTransferCloseStorageExceeded(t *testing.T) {
2525
buf := bytes.Buffer{}
2626
h := clientHandler{writer: bufio.NewWriter(&buf)}
27-
h.TransferClose(ErrStorageExceeded)
27+
h.TransferClose(nil, ErrStorageExceeded)
2828
require.Equal(t, "552 Issue during transfer: storage limit exceeded\r\n", buf.String())
2929
}
3030

handle_dirs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (c *clientHandler) handleLIST(param string) error {
191191
if files, _, err := c.getFileList(param, true); err == nil || errors.Is(err, io.EOF) {
192192
if tr, errTr := c.TransferOpen(info); errTr == nil {
193193
err = c.dirTransferLIST(tr, files)
194-
c.TransferClose(err)
194+
c.TransferClose(tr, err)
195195

196196
return nil
197197
}
@@ -210,7 +210,7 @@ func (c *clientHandler) handleNLST(param string) error {
210210
if files, parentDir, err := c.getFileList(param, true); err == nil || errors.Is(err, io.EOF) {
211211
if tr, errTrOpen := c.TransferOpen(info); errTrOpen == nil {
212212
err = c.dirTransferNLST(tr, files, parentDir)
213-
c.TransferClose(err)
213+
c.TransferClose(tr, err)
214214

215215
return nil
216216
}
@@ -257,7 +257,7 @@ func (c *clientHandler) handleMLSD(param string) error {
257257
if files, _, err := c.getFileList(param, false); err == nil || errors.Is(err, io.EOF) {
258258
if tr, errTr := c.TransferOpen(info); errTr == nil {
259259
err = c.dirTransferMLSD(tr, files)
260-
c.TransferClose(err)
260+
c.TransferClose(tr, err)
261261

262262
return nil
263263
}

0 commit comments

Comments
 (0)