Context tiefer leiten. Blockierung stört.

This commit is contained in:
Marco Kittel 2025-06-29 08:59:15 +02:00
parent 504cebebb9
commit 5858e70c0d
1 changed files with 42 additions and 35 deletions

View File

@ -1,6 +1,7 @@
package eliofile package eliofile
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"strings" "strings"
@ -46,48 +47,54 @@ func NewElioDateiFoo(lookUpDir string, process ElioHandleFunc) *ElioDateiFoo {
// Warenhausnamen in eine eine Hashmap ablegen. Dopplungen mit dem spätesten Zeitpunkt behalten. // Warenhausnamen in eine eine Hashmap ablegen. Dopplungen mit dem spätesten Zeitpunkt behalten.
// Veraltete Dateien mit Präfix no-import und Postifx .old benennen. // Veraltete Dateien mit Präfix no-import und Postifx .old benennen.
// Gültige Dateinamen zur Extraktion in Closure übergeben. // Gültige Dateinamen zur Extraktion in Closure übergeben.
func (f *ElioDateiFoo) ScanCsv(data chan<- CountryCsvData) { func (f *ElioDateiFoo) ScanCsv(ctx context.Context, data chan<- CountryCsvData) {
defer close(data) defer close(data)
files, err := os.ReadDir(f.lookUpDir) select {
if err != nil { case <-ctx.Done():
fmt.Println(err) fmt.Println("Abbruch angefordert im Verarbeitungsprozess.")
} default:
files, err := os.ReadDir(f.lookUpDir)
filemap := make(map[string]filedata)
for _, file := range files {
if !strings.Contains(file.Name(), FileExt) {
continue
}
tl, err := tools.ExtractDateAndConvertToDate(file.Name())
if err != nil { if err != nil {
continue fmt.Println(err)
} }
warehouse := tools.ExtractWarehouseAndType(file.Name())
value, ok := filemap[warehouse] filemap := make(map[string]filedata)
if !ok { for _, file := range files {
filemap[warehouse] = filedata{filename: file.Name(), dt: tl} fmt.Printf("Lese Datei: %s\n", file)
} else { if !strings.Contains(file.Name(), FileExt) {
if tl.Unix() >= value.dt.Unix() { continue
err := os.Rename(f.lookUpDir+"/"+value.filename, f.lookUpDir+"/"+"no-import-"+value.filename+".old") }
if err != nil { tl, err := tools.ExtractDateAndConvertToDate(file.Name())
fmt.Printf("Datei %s konnte nicht umbenannt werden!!!", value.filename) if err != nil {
} continue
}
warehouse := tools.ExtractWarehouseAndType(file.Name())
value, ok := filemap[warehouse]
if !ok {
filemap[warehouse] = filedata{filename: file.Name(), dt: tl} filemap[warehouse] = filedata{filename: file.Name(), dt: tl}
} else {
if tl.Unix() >= value.dt.Unix() {
err := os.Rename(f.lookUpDir+"/"+value.filename, f.lookUpDir+"/"+"no-import-"+value.filename+".old")
if err != nil {
fmt.Printf("Datei %s konnte nicht umbenannt werden!!!", value.filename)
}
filemap[warehouse] = filedata{filename: file.Name(), dt: tl}
}
} }
} }
}
for _, v := range filemap { for _, v := range filemap {
f.mu.RLock() f.mu.RLock()
_, ok := f.haveDone[v.filename] _, ok := f.haveDone[v.filename]
f.mu.RUnlock() f.mu.RUnlock()
if ok { if ok {
break break
}
//Todo: Goroutine Einbauen, die alte Datensätze aus der Map entfernt
f.mu.Lock()
f.haveDone[v.filename] = time.Now()
f.mu.Unlock()
go f.process(v.filename, data)
} }
//Todo: Goroutine Einbauen, die alte Datensätze aus der Map entfernt
f.mu.Lock()
f.haveDone[v.filename] = time.Now()
f.mu.Unlock()
go f.process(v.filename, data)
} }
} }