Skip to content

Commit a53ea74

Browse files
committed
Refactor SIP payload header parsing and extraction.
Consolidated SIP payload header parsing into a dedicated reusable function, reducing redundancy and improving maintainability. Updated the relevant tests and logic to leverage the new implementation, ensuring consistent behavior across SIP header extraction processes.
1 parent 127e38f commit a53ea74

7 files changed

Lines changed: 159 additions & 39 deletions

decompression.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/xi2/xz"
1111
)
1212

13+
type progressLogger func(format string, args ...any)
14+
1315
func defaultTarReader(xzReader *xz.Reader) (func() (*tar.Header, error), func() io.Reader) {
1416
tarReader := tar.NewReader(xzReader)
1517

@@ -20,7 +22,7 @@ func defaultTarReader(xzReader *xz.Reader) (func() (*tar.Header, error), func()
2022
}
2123
}
2224

23-
func decompressTarXz(tarReader func(*xz.Reader) (func() (*tar.Header, error), func() io.Reader), path, extractPath string) error {
25+
func decompressTarXz(tarReader func(*xz.Reader) (func() (*tar.Header, error), func() io.Reader), path, extractPath string, logf progressLogger) error {
2426
extractDirectory := filepath.Dir(extractPath)
2527

2628
if err := os.MkdirAll(extractDirectory, os.ModePerm); err != nil {
@@ -54,6 +56,7 @@ func decompressTarXz(tarReader func(*xz.Reader) (func() (*tar.Header, error), fu
5456
}
5557

5658
readNext, reader := tarReader(xzReader)
59+
entryCount := 0
5760

5861
for {
5962
header, err := readNext()
@@ -79,6 +82,7 @@ func decompressTarXz(tarReader func(*xz.Reader) (func() (*tar.Header, error), fu
7982

8083
switch header.Typeflag {
8184
case tar.TypeReg:
85+
logProgress(logf, "extracting embedded postgres entry archive=%s entry=%s type=file target=%s size=%d", path, header.Name, finalPath, header.Size)
8286
outFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
8387
if err != nil {
8488
return errorExtractingPostgres(err)
@@ -92,6 +96,7 @@ func decompressTarXz(tarReader func(*xz.Reader) (func() (*tar.Header, error), fu
9296
return errorExtractingPostgres(err)
9397
}
9498
case tar.TypeSymlink:
99+
logProgress(logf, "extracting embedded postgres entry archive=%s entry=%s type=symlink target=%s link=%s", path, header.Name, finalPath, header.Linkname)
95100
if err := os.RemoveAll(targetPath); err != nil {
96101
return errorExtractingPostgres(err)
97102
}
@@ -101,20 +106,32 @@ func decompressTarXz(tarReader func(*xz.Reader) (func() (*tar.Header, error), fu
101106
}
102107

103108
case tar.TypeDir:
109+
logProgress(logf, "extracting embedded postgres entry archive=%s entry=%s type=dir target=%s", path, header.Name, finalPath)
104110
if err := os.MkdirAll(finalPath, os.FileMode(header.Mode)); err != nil {
105111
return errorExtractingPostgres(err)
106112
}
113+
entryCount++
107114
continue
108115
}
109116

110117
if err := renameOrIgnore(targetPath, finalPath); err != nil {
111118
return errorExtractingPostgres(err)
112119
}
120+
entryCount++
113121
}
114122

123+
logProgress(logf, "finished extracting embedded postgres archive archive=%s destination=%s entries=%d", path, extractPath, entryCount)
124+
115125
return nil
116126
}
117127

128+
func logProgress(logf progressLogger, format string, args ...any) {
129+
if logf == nil {
130+
return
131+
}
132+
logf(format, args...)
133+
}
134+
118135
func errorUnableToExtract(cacheLocation, binariesPath string, err error) error {
119136
return fmt.Errorf("unable to extract postgres archive %s to %s, if running parallel tests, configure RuntimePath to isolate testing directories, %w",
120137
cacheLocation,

decompression_test.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package embeddedpostgres
22

33
import (
44
"archive/tar"
5+
"bytes"
56
"errors"
67
"fmt"
78
"io"
@@ -28,7 +29,7 @@ func Test_decompressTarXz(t *testing.T) {
2829
archive, cleanUp := createTempXzArchive()
2930
defer cleanUp()
3031

31-
err = decompressTarXz(defaultTarReader, archive, tempDir)
32+
err = decompressTarXz(defaultTarReader, archive, tempDir, nil)
3233

3334
assert.NoError(t, err)
3435

@@ -42,7 +43,7 @@ func Test_decompressTarXz(t *testing.T) {
4243
}
4344

4445
func Test_decompressTarXz_ErrorWhenFileNotExists(t *testing.T) {
45-
err := decompressTarXz(defaultTarReader, "/does-not-exist", "/also-fake")
46+
err := decompressTarXz(defaultTarReader, "/does-not-exist", "/also-fake", nil)
4647

4748
assert.Error(t, err)
4849
assert.Contains(
@@ -68,7 +69,7 @@ func Test_decompressTarXz_ErrorWhenErrorDuringRead(t *testing.T) {
6869
return func() (*tar.Header, error) {
6970
return nil, errors.New("oh noes")
7071
}, nil
71-
}, archive, tempDir)
72+
}, archive, tempDir, nil)
7273

7374
assert.EqualError(t, err, "unable to extract postgres archive: oh noes")
7475
}
@@ -108,7 +109,7 @@ func Test_decompressTarXz_ErrorWhenFailedToReadFileToCopy(t *testing.T) {
108109
}
109110
}
110111

111-
err = decompressTarXz(fileBlockingExtractTarReader, archive, tempDir)
112+
err = decompressTarXz(fileBlockingExtractTarReader, archive, tempDir, nil)
112113

113114
assert.Regexp(t, "^unable to extract postgres archive:.+$", err)
114115
}
@@ -145,7 +146,7 @@ func Test_decompressTarXz_ErrorWhenFileToCopyToNotExists(t *testing.T) {
145146
}
146147
}
147148

148-
err = decompressTarXz(fileBlockingExtractTarReader, archive, tempDir)
149+
err = decompressTarXz(fileBlockingExtractTarReader, archive, tempDir, nil)
149150

150151
assert.Regexp(t, "^unable to extract postgres archive:.+$", err)
151152
}
@@ -180,7 +181,7 @@ func Test_decompressTarXz_ErrorWhenArchiveCorrupted(t *testing.T) {
180181
panic(err)
181182
}
182183

183-
err = decompressTarXz(defaultTarReader, archive, tempDir)
184+
err = decompressTarXz(defaultTarReader, archive, tempDir, nil)
184185

185186
assert.EqualError(t, err, "unable to extract postgres archive: xz: data is corrupt")
186187
}
@@ -197,10 +198,26 @@ func Test_decompressTarXz_ErrorWithInvalidDestination(t *testing.T) {
197198

198199
op := fmt.Sprintf(path.Join(tempDir, "%c"), rune(0))
199200

200-
err = decompressTarXz(defaultTarReader, archive, op)
201+
err = decompressTarXz(defaultTarReader, archive, op, nil)
201202
assert.EqualError(
202203
t,
203204
err,
204205
fmt.Sprintf("unable to extract postgres archive: mkdir %s: invalid argument", op),
205206
)
206207
}
208+
209+
func Test_decompressTarXz_LogsExtractedEntries(t *testing.T) {
210+
tempDir := t.TempDir()
211+
archive, cleanUp := createTempXzArchive()
212+
defer cleanUp()
213+
214+
var logs bytes.Buffer
215+
err := decompressTarXz(defaultTarReader, archive, tempDir, func(format string, args ...any) {
216+
_, _ = fmt.Fprintf(&logs, format+"\n", args...)
217+
})
218+
219+
require.NoError(t, err)
220+
assert.Contains(t, logs.String(), "extracting embedded postgres entry")
221+
assert.Contains(t, logs.String(), "entry=dir1/dir2/some_content")
222+
assert.Contains(t, logs.String(), "finished extracting embedded postgres archive")
223+
}

embedded_postgres.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ func (ep *EmbeddedPostgres) Start() error {
102102
ep.config.binariesPath = ep.config.runtimePath
103103
}
104104

105+
ep.logf(
106+
"embedded postgres binary setup cache=%s cache_exists=%t runtime_path=%s binaries_path=%s data_path=%s",
107+
cacheLocation,
108+
cacheExists,
109+
ep.config.runtimePath,
110+
ep.config.binariesPath,
111+
ep.config.dataPath,
112+
)
113+
105114
if err := ep.downloadAndExtractBinary(cacheExists, cacheLocation); err != nil {
106115
return err
107116
}
@@ -159,21 +168,36 @@ func (ep *EmbeddedPostgres) downloadAndExtractBinary(cacheExists bool, cacheLoca
159168
mu.Lock()
160169
defer mu.Unlock()
161170

162-
_, binDirErr := os.Stat(filepath.Join(ep.config.binariesPath, "bin", "pg_ctl"))
171+
pgCtlPath := filepath.Join(ep.config.binariesPath, "bin", "pg_ctl")
172+
_, binDirErr := os.Stat(pgCtlPath)
163173
if os.IsNotExist(binDirErr) {
164174
if !cacheExists {
165-
if err := ep.remoteFetchStrategy(); err != nil {
175+
ep.logf("downloading embedded postgres archive cache=%s", cacheLocation)
176+
if err := ep.remoteFetchStrategy(ep.logf); err != nil {
166177
return err
167178
}
179+
} else {
180+
ep.logf("using cached embedded postgres archive cache=%s", cacheLocation)
168181
}
169182

170-
if err := decompressTarXz(defaultTarReader, cacheLocation, ep.config.binariesPath); err != nil {
183+
ep.logf("extracting embedded postgres archive archive=%s destination=%s", cacheLocation, ep.config.binariesPath)
184+
if err := decompressTarXz(defaultTarReader, cacheLocation, ep.config.binariesPath, ep.logf); err != nil {
171185
return err
172186
}
187+
ep.logf("embedded postgres archive extracted archive=%s destination=%s", cacheLocation, ep.config.binariesPath)
188+
} else {
189+
ep.logf("embedded postgres binaries already available pg_ctl=%s", pgCtlPath)
173190
}
174191
return nil
175192
}
176193

194+
func (ep *EmbeddedPostgres) logf(format string, args ...any) {
195+
if ep == nil || ep.syncedLogger == nil {
196+
return
197+
}
198+
ep.syncedLogger.logf(format, args...)
199+
}
200+
177201
func (ep *EmbeddedPostgres) GetConnectionURL() string {
178202
return ep.config.GetConnectionURL()
179203
}

embedded_postgres_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ func Test_ErrorWhenPortAlreadyTaken(t *testing.T) {
6565
}
6666

6767
func Test_ErrorWhenRemoteFetchError(t *testing.T) {
68-
database := NewDatabase()
68+
database := NewDatabase(DefaultConfig().Port(9875))
6969
database.cacheLocator = func() (string, bool) {
7070
return "", false
7171
}
72-
database.remoteFetchStrategy = func() error {
72+
database.remoteFetchStrategy = func(progressLogger) error {
7373
return errors.New("did not work")
7474
}
7575

@@ -328,7 +328,7 @@ func Test_CustomLog(t *testing.T) {
328328
assert.Contains(t, lines, fmt.Sprintf("The files belonging to this database system will be owned by user \"%s\".", current.Username))
329329
assert.Contains(t, lines, "syncing data to disk ... ok")
330330
assert.Contains(t, lines, "server stopped")
331-
assert.Less(t, len(lines), 55)
331+
assert.Less(t, len(lines), 70)
332332
assert.Greater(t, len(lines), 40)
333333
}
334334

@@ -767,20 +767,20 @@ func Test_PrefetchedBinaries(t *testing.T) {
767767
RuntimePath(runtimeTempDir))
768768

769769
// Download and unarchive postgres into the bindir.
770-
if err := database.remoteFetchStrategy(); err != nil {
770+
if err := database.remoteFetchStrategy(nil); err != nil {
771771
panic(err)
772772
}
773773

774774
cacheLocation, _ := database.cacheLocator()
775-
if err := decompressTarXz(defaultTarReader, cacheLocation, binTempDir); err != nil {
775+
if err := decompressTarXz(defaultTarReader, cacheLocation, binTempDir, nil); err != nil {
776776
panic(err)
777777
}
778778

779779
// Expect everything to work without cacheLocator and/or remoteFetch abilities.
780780
database.cacheLocator = func() (string, bool) {
781781
return "", false
782782
}
783-
database.remoteFetchStrategy = func() error {
783+
database.remoteFetchStrategy = func(progressLogger) error {
784784
return errors.New("did not work")
785785
}
786786

logging.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,21 @@ func (s *syncedLogger) flush() error {
6161
return nil
6262
}
6363

64+
func (s *syncedLogger) logf(format string, args ...any) {
65+
if s == nil || s.file == nil {
66+
return
67+
}
68+
if _, err := fmt.Fprintf(s.file, format+"\n", args...); err != nil {
69+
panic(err)
70+
}
71+
if err := s.file.Sync(); err != nil {
72+
panic(err)
73+
}
74+
if err := s.flush(); err != nil {
75+
panic(err)
76+
}
77+
}
78+
6479
func readLogsOrTimeout(logger *os.File) (logContent []byte, err error) {
6580
logContent = []byte("logs could not be read")
6681

remote_fetch.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,19 @@ import (
1616
)
1717

1818
// RemoteFetchStrategy provides a strategy to fetch a Postgres binary so that it is available for use.
19-
type RemoteFetchStrategy func() error
19+
type RemoteFetchStrategy func(progressLogger) error
2020

2121
var freeBSDBinaryRepositoryURL = "https://web.sintel.com.tr/downloads/siper/pg"
2222

2323
//nolint:funlen
2424
func defaultRemoteFetchStrategy(remoteFetchHost string, versionStrategy VersionStrategy, cacheLocator CacheLocator) RemoteFetchStrategy {
25-
return func() error {
25+
return func(logf progressLogger) error {
2626
operatingSystem, architecture, version := versionStrategy()
2727
cacheLocation, _ := cacheLocator()
2828

2929
if freeBSDDirectDownloadURL, ok := freeBSDBundleDownloadURL(operatingSystem, architecture); ok {
30-
if err := downloadArchiveToCache(freeBSDDirectDownloadURL, cacheLocation); err != nil {
30+
logProgress(logf, "downloading embedded postgres archive url=%s cache=%s", freeBSDDirectDownloadURL, cacheLocation)
31+
if err := downloadArchiveToCache(freeBSDDirectDownloadURL, cacheLocation, logf); err != nil {
3132
return err
3233
}
3334
return nil
@@ -41,6 +42,7 @@ func defaultRemoteFetchStrategy(remoteFetchHost string, versionStrategy VersionS
4142
operatingSystem,
4243
architecture,
4344
version)
45+
logProgress(logf, "downloading embedded postgres bundle url=%s cache=%s", jarDownloadURL, cacheLocation)
4446

4547
jarDownloadResponse, err := http.Get(jarDownloadURL)
4648
if err != nil {
@@ -74,7 +76,7 @@ func defaultRemoteFetchStrategy(remoteFetchHost string, versionStrategy VersionS
7476
}
7577
}
7678

77-
return decompressResponse(jarBodyBytes, jarDownloadResponse.ContentLength, cacheLocator, jarDownloadURL)
79+
return decompressResponse(jarBodyBytes, jarDownloadResponse.ContentLength, cacheLocator, jarDownloadURL, logf)
7880
}
7981
}
8082

@@ -89,7 +91,7 @@ func freeBSDBundleDownloadURL(operatingSystem, architecture string) (string, boo
8991
}
9092
}
9193

92-
func downloadArchiveToCache(downloadURL, cacheLocation string) error {
94+
func downloadArchiveToCache(downloadURL, cacheLocation string, logf progressLogger) error {
9395
downloadResponse, err := http.Get(downloadURL)
9496
if err != nil {
9597
return fmt.Errorf("unable to connect to %s", downloadURL)
@@ -107,6 +109,7 @@ func downloadArchiveToCache(downloadURL, cacheLocation string) error {
107109
if err := os.MkdirAll(filepath.Dir(cacheLocation), 0755); err != nil {
108110
return errorExtractingPostgres(err)
109111
}
112+
logProgress(logf, "downloaded embedded postgres archive url=%s cache=%s bytes=%d", downloadURL, cacheLocation, len(archiveBytes))
110113
return writeArchiveAtomically(cacheLocation, archiveBytes)
111114
}
112115

@@ -149,7 +152,7 @@ func closeBody(resp *http.Response) func() {
149152
}
150153
}
151154

152-
func decompressResponse(bodyBytes []byte, contentLength int64, cacheLocator CacheLocator, downloadURL string) error {
155+
func decompressResponse(bodyBytes []byte, contentLength int64, cacheLocator CacheLocator, downloadURL string, logf progressLogger) error {
153156
size := contentLength
154157
// if the content length is not set (i.e. chunked encoding),
155158
// we need to use the length of the bodyBytes otherwise
@@ -170,6 +173,7 @@ func decompressResponse(bodyBytes []byte, contentLength int64, cacheLocator Cach
170173

171174
for _, file := range zipReader.File {
172175
if !file.FileHeader.FileInfo().IsDir() && strings.HasSuffix(file.FileHeader.Name, ".txz") {
176+
logProgress(logf, "writing embedded postgres archive from bundle url=%s entry=%s cache=%s", downloadURL, file.FileHeader.Name, cacheLocation)
173177
if err := decompressSingleFile(file, cacheLocation); err != nil {
174178
return err
175179
}

0 commit comments

Comments
 (0)