diff --git a/backend/folddiscojob.go b/backend/folddiscojob.go index a97f3bb..a61a120 100644 --- a/backend/folddiscojob.go +++ b/backend/folddiscojob.go @@ -4,7 +4,9 @@ import ( "crypto/sha256" "encoding/base64" "errors" + "fmt" "os" + "path/filepath" "sort" "strings" ) @@ -14,8 +16,13 @@ type FoldDiscoJob struct { Database []string `json:"database" validate:"required"` // Mode string `json:"mode" validate:"oneof=3di tmalign 3diaa"` // TaxFilter string `json:"taxfilter"` - Motif string `json:"motif"` - query string + Motif string `json:"motif"` + // Motifs[i] holds one or more motif strings for queries[i]. + // Each motifs[] form value may contain multiple motifs separated by ";". + Motifs [][]string `json:"motifs,omitempty"` + Top int `json:"top,omitempty"` + query string + queries []string } const FolddiscoCacheVersion string = "v1" @@ -26,6 +33,15 @@ func (r FoldDiscoJob) Hash() Id { h.Write(([]byte)(JobFoldDisco)) h.Write([]byte(r.query)) h.Write([]byte(r.Motif)) + h.Write([]byte(fmt.Sprintf("%d", r.Top))) + for _, q := range r.queries { + h.Write([]byte(q)) + } + for _, ms := range r.Motifs { + for _, m := range ms { + h.Write([]byte(m)) + } + } // h.Write([]byte(r.Mode)) // if r.TaxFilter != "" { // h.Write([]byte(r.TaxFilter)) @@ -53,14 +69,56 @@ func (r FoldDiscoJob) WritePDB(path string) error { return nil } -func NewFoldDiscoJobRequest(query string, motif string, dbs []string, validDbs []Params, resultPath string, email string) (JobRequest, error) { +func (r FoldDiscoJob) IsBatch() bool { + return len(r.Motifs) > 0 +} + +func (r FoldDiscoJob) WriteBatchFiles(basePath string) error { + var lines []string + for i, q := range r.queries { + ext := ".pdb" + if len(q) > 0 && ismmCIFFirstLine(strings.TrimSpace(q)) { + ext = ".cif" + } + filename := fmt.Sprintf("job_%d%s", i, ext) + fp := filepath.Join(basePath, filename) + if err := os.WriteFile(fp, []byte(q), 0644); err != nil { + return err + } + // Each query can have multiple motifs; emit one batch line per motif. + for _, m := range r.Motifs[i] { + lines = append(lines, fp+"\t"+m) + } + } + content := strings.Join(lines, "\n") + "\n" + return os.WriteFile(filepath.Join(basePath, "query_batch.txt"), []byte(content), 0644) +} + +func validateFolddiscoDatabases(dbs []string, validDbs []Params) error { + ids := make([]string, 0) + for _, item := range validDbs { + if item.Motif { + ids = append(ids, item.Path) + } + } + for _, item := range dbs { + if isIn(item, ids) == -1 { + return errors.New("selected databases are not valid") + } + } + return nil +} + +func NewFoldDiscoJobRequest(query string, motif string, dbs []string, validDbs []Params, resultPath string, email string, top int) (JobRequest, error) { + if top <= 0 { + top = 1000 + } job := FoldDiscoJob{ - max(strings.Count(query, "HEADER"), 1), - dbs, - // mode, - // taxfilter, - motif, - query, + Size: max(strings.Count(query, "HEADER"), 1), + Database: dbs, + Motif: motif, + Top: top, + query: query, } request := JobRequest{ @@ -71,23 +129,57 @@ func NewFoldDiscoJobRequest(query string, motif string, dbs []string, validDbs [ email, } - ids := make([]string, 0) - for _, item := range validDbs { - if item.Motif { - ids = append(ids, item.Path) - } + if err := validateFolddiscoDatabases(dbs, validDbs); err != nil { + return request, err } - for _, item := range job.Database { - idx := isIn(item, ids) - if idx == -1 { - return request, errors.New("selected databases are not valid") + return request, nil +} + +// NewFoldDiscoBatchJobRequest creates a batch Folddisco job. motifs[i] contains +// the motifs for queries[i]; each entry may hold more than one motif so that a +// single structure can be searched with multiple motif specifications without +// re-uploading the file. +func NewFoldDiscoBatchJobRequest(queries []string, motifs [][]string, dbs []string, validDbs []Params, resultPath string, email string, top int) (JobRequest, error) { + if len(queries) != len(motifs) { + return JobRequest{}, errors.New("queries and motifs must have the same length") + } + if len(queries) == 0 { + return JobRequest{}, errors.New("at least one query is required") + } + for i, ms := range motifs { + if len(ms) == 0 { + return JobRequest{}, fmt.Errorf("query %d has no motifs", i) } } - // if !validTaxonFilter(taxfilter) { - // return request, errors.New("invalid taxon filter") - // } + totalSize := 0 + for _, q := range queries { + totalSize += max(strings.Count(q, "HEADER"), 1) + } + + if top <= 0 { + top = 1000 + } + job := FoldDiscoJob{ + Size: totalSize, + Database: dbs, + Motifs: motifs, + Top: top, + queries: queries, + } + + request := JobRequest{ + job.Hash(), + StatusPending, + JobFoldDisco, + job, + email, + } + + if err := validateFolddiscoDatabases(dbs, validDbs); err != nil { + return request, err + } return request, nil } diff --git a/backend/jobsystem.go b/backend/jobsystem.go index 37a72da..ee42b95 100644 --- a/backend/jobsystem.go +++ b/backend/jobsystem.go @@ -159,6 +159,9 @@ func (m *JobRequest) WriteSupportFiles(base string) error { return errors.New("invalid job type") case JobFoldDisco: if j, ok := m.Job.(FoldDiscoJob); ok { + if j.IsBatch() { + return j.WriteBatchFiles(base) + } return j.WritePDB(filepath.Join(base, "job.pdb")) } return errors.New("invalid job type") diff --git a/backend/server.go b/backend/server.go index 334c3bb..2e1d4e7 100644 --- a/backend/server.go +++ b/backend/server.go @@ -543,9 +543,12 @@ func server(jobsystem JobSystem, config ConfigRoot) { ticketFolddiscoHandlerFunc := func(w http.ResponseWriter, req *http.Request) { var query string var motif string + var queries []string + var rawMotifs []string var dbs []string //var mode string var email string + var top int //var iterativesearch bool // var taxfilter string @@ -556,19 +559,40 @@ func server(jobsystem JobSystem, config ConfigRoot) { return } - f, _, err := req.FormFile("q") - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + // Batch mode: multiple structures via queries[] file fields + if files := req.MultipartForm.File["queries[]"]; len(files) > 0 { + for _, fh := range files { + file, err := fh.Open() + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + buf := new(bytes.Buffer) + buf.ReadFrom(file) + file.Close() + queries = append(queries, buf.String()) + } + } else if formQueries := req.Form["queries[]"]; len(formQueries) > 0 { + queries = formQueries + } + + if len(queries) > 0 { + rawMotifs = req.Form["motifs[]"] + } else { + f, _, err := req.FormFile("q") + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + buf := new(bytes.Buffer) + buf.ReadFrom(f) + query = buf.String() + motif = req.FormValue("motif") } - buf := new(bytes.Buffer) - buf.ReadFrom(f) - query = buf.String() dbs = req.Form["database[]"] //mode = req.FormValue("mode") email = req.FormValue("email") - motif = req.FormValue("motif") // taxfilter = req.FormValue("taxfilter") } else { err := req.ParseForm() @@ -576,21 +600,44 @@ func server(jobsystem JobSystem, config ConfigRoot) { http.Error(w, err.Error(), http.StatusBadRequest) return } - query = req.FormValue("q") + + queries = req.Form["queries[]"] + rawMotifs = req.Form["motifs[]"] + + if len(queries) == 0 { + query = req.FormValue("q") + motif = req.FormValue("motif") + } + dbs = req.Form["database[]"] //mode = req.FormValue("mode") email = req.FormValue("email") - motif = req.FormValue("motif") // taxfilter = req.FormValue("taxfilter") } + if topStr := req.FormValue("top"); topStr != "" { + top, _ = strconv.Atoi(topStr) + } + databases, err := Databases(config.Paths.Databases, true) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - request, err := NewFoldDiscoJobRequest(query, motif, dbs, databases /*mode,*/, config.Paths.Results, email /*, taxfilter*/) + // Each rawMotifs[] entry may contain multiple motifs separated by ";", + // allowing a single uploaded structure to be searched with several motifs. + var motifs [][]string + for _, raw := range rawMotifs { + motifs = append(motifs, strings.Split(raw, ";")) + } + + var request JobRequest + if len(queries) > 0 { + request, err = NewFoldDiscoBatchJobRequest(queries, motifs, dbs, databases, config.Paths.Results, email, top) + } else { + request, err = NewFoldDiscoJobRequest(query, motif, dbs, databases, config.Paths.Results, email, top) + } if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/backend/worker.go b/backend/worker.go index e82ce27..c5849af 100644 --- a/backend/worker.go +++ b/backend/worker.go @@ -133,6 +133,49 @@ func execCommandSync(verbose bool, parameters []string, environ []string, timeou var fasta3DiInput = regexp.MustCompile(`^>.*?\n.*?\n>3DI.*?\n.*?\n`).MatchString +func ismmCIFFirstLine(line string) bool { + return strings.HasPrefix(line, "#") || strings.HasPrefix(line, "data_") +} + +type batchLine struct { + Structure string + Motif string +} + +func readBatchLines(path string) ([]batchLine, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var lines []batchLine + for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") { + parts := strings.SplitN(line, "\t", 2) + if len(parts) != 2 { + continue + } + lines = append(lines, batchLine{Structure: parts[0], Motif: parts[1]}) + } + return lines, nil +} + +func concatFiles(srcs []string, dst string) error { + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + for _, src := range srcs { + data, err := os.ReadFile(src) + if err != nil { + return err + } + if _, err := out.Write(data); err != nil { + return err + } + } + return nil +} + func ismmCIFFile(filePath string) (bool, error) { file, err := os.Open(filePath) if err != nil { @@ -156,10 +199,7 @@ func ismmCIFFile(filePath string) (bool, error) { return false, errors.New("empty file") } - if strings.HasPrefix(firstLine, "#") || strings.HasPrefix(firstLine, "data_") { - return true, nil - } - return false, nil + return ismmCIFFirstLine(firstLine), nil } func RunJob(request JobRequest, config ConfigRoot) (err error) { @@ -1654,22 +1694,27 @@ rm -rf -- "${BASE}/tmp" maxParallel := config.Worker.ParallelDatabases semaphore := make(chan struct{}, max(1, maxParallel)) - inputFile := filepath.Join(resultBase, "job.pdb") + isBatch := job.IsBatch() + inputFile := "" + batchFile := filepath.Join(resultBase, "query_batch.txt") - isCif, err := ismmCIFFile(inputFile) - if err != nil { - return &JobExecutionError{err} - } - - if isCif { - newFilePath := filepath.Join(resultBase, "job.cif") - if err := os.Rename(inputFile, newFilePath); err != nil { + if !isBatch { + inputFile = filepath.Join(resultBase, "job.pdb") + isCif, err := ismmCIFFile(inputFile) + if err != nil { return &JobExecutionError{err} } - inputFile = newFilePath + if isCif { + newFilePath := filepath.Join(resultBase, "job.cif") + if err := os.Rename(inputFile, newFilePath); err != nil { + return &JobExecutionError{err} + } + inputFile = newFilePath + } } motif := job.Motif + top := strconv.Itoa(job.Top) for index, database := range job.Database { wg.Add(1) @@ -1682,12 +1727,6 @@ rm -rf -- "${BASE}/tmp" errChan <- &JobExecutionError{err} return } - if !params.Motif { - err := errors.New("Database is not a folddisco database") - errChan <- &JobExecutionError{err} - return - } - threads := 16 for _, kv := range os.Environ() { if strings.HasPrefix(kv, "MMSEQS_NUM_THREADS=") { @@ -1712,125 +1751,188 @@ rm -rf -- "${BASE}/tmp" if params.OverridePath != "" { dbpath = filepath.Clean(params.OverridePath) } - parameters := []string{ - config.Paths.FoldDisco, - "query", - "-p", - inputFile, - "-q", motif, - "-i", - dbpath, - "-o", - filepath.Join(resultBase, "alis_"+database), - "--top", - "1000", - "--superpose", - "--partial-fit", - "-t", - strconv.Itoa(threads), - } - parameters = append(parameters, strings.Fields(params.Search)...) - cmd, done, err := execCommand(config.Verbose, parameters, []string{}) - if err != nil { - errChan <- &JobExecutionError{err} - return - } + outputFile := filepath.Join(resultBase, "alis_"+database) + extraArgs := strings.Fields(params.Search) - select { - case <-time.After(1 * time.Hour): - if err := KillCommand(cmd); err != nil { - log.Printf("Failed to kill: %s\n", err) + if isBatch { + // Generate a 3-column batch file with per-query output paths + // (col3 tells folddisco to write each query's results to its + // own file instead of stdout, avoiding interleaved output). + batchLines, err := readBatchLines(batchFile) + if err != nil { + errChan <- &JobExecutionError{err} + return } - errChan <- &JobTimeoutError{} - case err := <-done: + var partFiles []string + dbBatch := filepath.Join(resultBase, fmt.Sprintf("batch_%s.txt", database)) + var batchContent strings.Builder + for i, bl := range batchLines { + partFile := filepath.Join(resultBase, fmt.Sprintf("alis_%s_part_%d", database, i)) + partFiles = append(partFiles, partFile) + batchContent.WriteString(bl.Structure + "\t" + bl.Motif + "\t" + partFile + "\n") + } + if err := os.WriteFile(dbBatch, []byte(batchContent.String()), 0644); err != nil { + errChan <- &JobExecutionError{err} + return + } + + parameters := []string{ + config.Paths.FoldDisco, + "query", + "-q", dbBatch, + "-i", dbpath, + "--top", top, + "--superpose", + "--partial-fit", + "-t", strconv.Itoa(threads), + } + parameters = append(parameters, extraArgs...) + + cmd, done, err := execCommand(config.Verbose, parameters, []string{}) if err != nil { errChan <- &JobExecutionError{err} - } else { - if params.FullHeader || params.Taxonomy { - foldseekDb := strings.Replace(dbpath, "_folddisco", "", 1) - if fileExists(foldseekDb + ".dbtype") { - columns := "bits" - if params.FullHeader { - columns += ",theader" - } else { - columns += ",empty" - } - if params.Taxonomy { - columns += ",taxid,taxname" - } - // make fake foldseek dbs so we can call convertalis for description and taxonomy - parameters = []string{ - "awk", - "-v", - "db=" + filepath.Join(resultBase, "alis_"+database+"_db"), - `BEGIN { printf("") > db; printf("") > db"_seq"; printf("") > db"_seq_h"; } + return + } + select { + case <-time.After(1 * time.Hour): + if err := KillCommand(cmd); err != nil { + log.Printf("Failed to kill: %s\n", err) + } + errChan <- &JobTimeoutError{} + return + case err := <-done: + if err != nil { + errChan <- &JobExecutionError{err} + return + } + } + + if err := concatFiles(partFiles, outputFile); err != nil { + errChan <- &JobExecutionError{err} + return + } + for _, f := range partFiles { + os.Remove(f) + } + os.Remove(dbBatch) + } else { + parameters := []string{ + config.Paths.FoldDisco, + "query", + "-p", inputFile, + "-q", motif, + "-i", dbpath, + "-o", outputFile, + "--top", top, + "--superpose", + "--partial-fit", + "-t", strconv.Itoa(threads), + } + parameters = append(parameters, extraArgs...) + + cmd, done, err := execCommand(config.Verbose, parameters, []string{}) + if err != nil { + errChan <- &JobExecutionError{err} + return + } + select { + case <-time.After(1 * time.Hour): + if err := KillCommand(cmd); err != nil { + log.Printf("Failed to kill: %s\n", err) + } + errChan <- &JobTimeoutError{} + return + case err := <-done: + if err != nil { + errChan <- &JobExecutionError{err} + return + } + } + } + + // Post-processing: convertalis for headers/taxonomy + if params.FullHeader || params.Taxonomy { + foldseekDb := strings.Replace(dbpath, "_folddisco", "", 1) + if fileExists(foldseekDb + ".dbtype") { + columns := "bits" + if params.FullHeader { + columns += ",theader" + } else { + columns += ",empty" + } + if params.Taxonomy { + columns += ",taxid,taxname" + } + // make fake foldseek dbs so we can call convertalis for description and taxonomy + err = execCommandSync( + config.Verbose, + []string{ + "awk", + "-v", + "db=" + filepath.Join(resultBase, "alis_"+database+"_db"), + `BEGIN { printf("") > db; printf("") > db"_seq"; printf("") > db"_seq_h"; } !($9 in f) { entry = $9"\t"$9"\t0.00\t0\t0\t0\t0\t0\t0\t0\t0"; print(entry) >> db; len += length(entry) + 1; f[$9] = 1; } END { printf("%c", 0) >> db; printf("%c", 0) >> db"_seq"; printf("%c", 0) >> db"_seq_h"; print "0\t0\t"(len + 1) > db".index"; print "0\t0\t0" > db"_seq.index"; print "0\t0\t0" > db"_seq_h.index"; printf("%c%c%c%c",5,0,0,0) > db".dbtype"; printf("%c%c%c%c",0,0,0,0) > db"_seq.dbtype"; printf("%c%c%c%c",11,0,0,0) > db"_seq_h.dbtype" }`, - filepath.Join(resultBase, "alis_"+database), - } - err = execCommandSync( - config.Verbose, - parameters, - []string{}, - 1*time.Minute, - ) - if err != nil { - errChan <- &JobExecutionError{err} - return - } - err = execCommandSync( - config.Verbose, - []string{ - config.Paths.Foldseek, - "convertalis", - filepath.Join(resultBase, "alis_"+database+"_db_seq"), - foldseekDb, - filepath.Join(resultBase, "alis_"+database+"_db"), - filepath.Join(resultBase, "alis_"+database+".tsv"), - "--format-output", - columns, - "--db-load-mode", - "2", - "--db-output", - "0", - }, - []string{}, - 1*time.Minute, - ) - if err != nil { - errChan <- &JobExecutionError{err} - return - } - if params.Taxonomy { - err = execCommandSync( - config.Verbose, - []string{ - config.Paths.Foldseek, - "taxonomyreport", - foldseekDb, - filepath.Join(resultBase, "alis_"+database+"_db"), - filepath.Join(resultBase, "alis_"+database+"_report"), - "--report-mode", - "3", - }, - []string{}, - 1*time.Minute, - ) - if err != nil { - errChan <- &JobExecutionError{err} - return - } - } + filepath.Join(resultBase, "alis_"+database), + }, + []string{}, + 1*time.Minute, + ) + if err != nil { + errChan <- &JobExecutionError{err} + return + } + err = execCommandSync( + config.Verbose, + []string{ + config.Paths.Foldseek, + "convertalis", + filepath.Join(resultBase, "alis_"+database+"_db_seq"), + foldseekDb, + filepath.Join(resultBase, "alis_"+database+"_db"), + filepath.Join(resultBase, "alis_"+database+".tsv"), + "--format-output", + columns, + "--db-load-mode", + "2", + "--db-output", + "0", + }, + []string{}, + 1*time.Minute, + ) + if err != nil { + errChan <- &JobExecutionError{err} + return + } + if params.Taxonomy { + err = execCommandSync( + config.Verbose, + []string{ + config.Paths.Foldseek, + "taxonomyreport", + foldseekDb, + filepath.Join(resultBase, "alis_"+database+"_db"), + filepath.Join(resultBase, "alis_"+database+"_report"), + "--report-mode", + "3", + }, + []string{}, + 1*time.Minute, + ) + if err != nil { + errChan <- &JobExecutionError{err} + return } } - - errChan <- nil } } + + errChan <- nil }(index, database) }