package eliofile import ( "context" "fmt" "os" "strings" "sync" "time" "gittea.marcokittel.de/elio/eliotools/tools" ) const FileExt = "csv" type filedata struct { filename string dt time.Time } // Daten über Channel aus der Goroutine ausführen. CounterID stellt sicher, dass das Herkunftsland der Daten bekannt ist. type CountryCsvData struct { CountryID string Data []string } type ElioHandleFunc func(ctx context.Context, filename string, data chan<- CountryCsvData) bool type ElioDateiFoo struct { process ElioHandleFunc lookUpDir string processedDir string mu sync.RWMutex haveDone map[string]time.Time } func NewElioDateiFoo(lookUpDir string, process ElioHandleFunc) *ElioDateiFoo { df := ElioDateiFoo{lookUpDir: lookUpDir, process: process, haveDone: make(map[string]time.Time), } return &df } // Ordner durchiterieren. Dateien ohne csv Endnung ausschließlen. // Datumsobjekt und Warenhaus sowie Lokalität extrahieren. Ungültige Dateien überspringen. // Warenhausnamen in eine eine Hashmap ablegen. Dopplungen mit dem spätesten Zeitpunkt behalten. // Veraltete Dateien mit Präfix no-import und Postifx .old benennen. // Gültige Dateinamen zur Extraktion in Closure übergeben. func (f *ElioDateiFoo) ScanCsv(ctx context.Context, data chan<- CountryCsvData) { select { case <-ctx.Done(): fmt.Println("Abbruch angefordert im Verarbeitungsprozess.") default: files, err := os.ReadDir(f.lookUpDir) if err != nil { fmt.Println(err) } filemap := make(map[string]filedata) for _, file := range files { fmt.Printf("Lese Datei: %s\n", file) if !strings.Contains(file.Name(), FileExt) { continue } tl, err := tools.ExtractDateAndConvertToDate(file.Name()) if err != nil { continue } warehouse := tools.ExtractWarehouseAndType(file.Name()) value, ok := filemap[warehouse] if !ok { filemap[warehouse] = filedata{filename: file.Name(), dt: tl} } else { if tl.Unix() >= value.dt.Unix() { err := os.Rename(f.lookUpDir+"/"+value.filename, f.lookUpDir+"/"+"no-import-"+value.filename+".old") if err != nil { fmt.Printf("Datei %s konnte nicht umbenannt werden!!!", value.filename) } filemap[warehouse] = filedata{filename: file.Name(), dt: tl} } } } for _, v := range filemap { f.mu.RLock() _, ok := f.haveDone[v.filename] f.mu.RUnlock() if ok { break } //Todo: Goroutine Einbauen, die alte Datensätze aus der Map entfernt f.mu.Lock() f.haveDone[v.filename] = time.Now() f.mu.Unlock() //Max 5 Sekunden zum Abarbeiten eines Datensatzes tctx, cancel := context.WithTimeout(ctx, time.Second*5) f.process(tctx, v.filename, data) cancel() } } }