Probleme mit Deadlocks durch Timeout im Ctx verhindern.
This commit is contained in:
parent
b3367d8e5e
commit
6f75f05e06
|
|
@ -48,55 +48,52 @@ func NewElioDateiFoo(lookUpDir string, process ElioHandleFunc) *ElioDateiFoo {
|
||||||
// Veraltete Dateien mit Präfix no-import und Postifx .old benennen.
|
// Veraltete Dateien mit Präfix no-import und Postifx .old benennen.
|
||||||
// Gültige Dateinamen zur Extraktion in Closure übergeben.
|
// Gültige Dateinamen zur Extraktion in Closure übergeben.
|
||||||
func (f *ElioDateiFoo) ScanCsv(ctx context.Context, data chan<- CountryCsvData) {
|
func (f *ElioDateiFoo) ScanCsv(ctx context.Context, data chan<- CountryCsvData) {
|
||||||
select {
|
files, err := os.ReadDir(f.lookUpDir)
|
||||||
case <-ctx.Done():
|
if err != nil {
|
||||||
fmt.Println("Abbruch angefordert im Verarbeitungsprozess.")
|
fmt.Println(err)
|
||||||
default:
|
}
|
||||||
files, err := os.ReadDir(f.lookUpDir)
|
|
||||||
|
filemap := make(map[string]filedata)
|
||||||
|
for _, file := range files {
|
||||||
|
fmt.Printf("Lese Datei: %s\n", file)
|
||||||
|
if !strings.Contains(file.Name(), FileExt) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tl, err := tools.ExtractDateAndConvertToDate(file.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
continue
|
||||||
}
|
}
|
||||||
|
warehouse := tools.ExtractWarehouseAndType(file.Name())
|
||||||
filemap := make(map[string]filedata)
|
value, ok := filemap[warehouse]
|
||||||
for _, file := range files {
|
if !ok {
|
||||||
fmt.Printf("Lese Datei: %s\n", file)
|
filemap[warehouse] = filedata{filename: file.Name(), dt: tl}
|
||||||
if !strings.Contains(file.Name(), FileExt) {
|
} else {
|
||||||
continue
|
if tl.Unix() >= value.dt.Unix() {
|
||||||
}
|
err := os.Rename(f.lookUpDir+"/"+value.filename, f.lookUpDir+"/"+"no-import-"+value.filename+".old")
|
||||||
tl, err := tools.ExtractDateAndConvertToDate(file.Name())
|
if err != nil {
|
||||||
if err != nil {
|
fmt.Printf("Datei %s konnte nicht umbenannt werden!!!", value.filename)
|
||||||
continue
|
|
||||||
}
|
|
||||||
warehouse := tools.ExtractWarehouseAndType(file.Name())
|
|
||||||
value, ok := filemap[warehouse]
|
|
||||||
if !ok {
|
|
||||||
filemap[warehouse] = filedata{filename: file.Name(), dt: tl}
|
|
||||||
} else {
|
|
||||||
if tl.Unix() >= value.dt.Unix() {
|
|
||||||
err := os.Rename(f.lookUpDir+"/"+value.filename, f.lookUpDir+"/"+"no-import-"+value.filename+".old")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Datei %s konnte nicht umbenannt werden!!!", value.filename)
|
|
||||||
}
|
|
||||||
filemap[warehouse] = filedata{filename: file.Name(), dt: tl}
|
|
||||||
}
|
}
|
||||||
|
filemap[warehouse] = filedata{filename: file.Name(), dt: tl}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, v := range filemap {
|
for _, v := range filemap {
|
||||||
f.mu.RLock()
|
f.mu.RLock()
|
||||||
_, ok := f.haveDone[v.filename]
|
_, ok := f.haveDone[v.filename]
|
||||||
f.mu.RUnlock()
|
f.mu.RUnlock()
|
||||||
if ok {
|
if ok {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
//Todo: Goroutine Einbauen, die alte Datensätze aus der Map entfernt
|
//Todo: Goroutine Einbauen, die alte Datensätze aus der Map entfernt
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
f.haveDone[v.filename] = time.Now()
|
f.haveDone[v.filename] = time.Now()
|
||||||
f.mu.Unlock()
|
f.mu.Unlock()
|
||||||
|
go func() {
|
||||||
//Max 5 Sekunden zum Abarbeiten eines Datensatzes
|
//Max 5 Sekunden zum Abarbeiten eines Datensatzes
|
||||||
tctx, cancel := context.WithTimeout(ctx, time.Second*5)
|
tctx, cancel := context.WithTimeout(ctx, time.Second*5)
|
||||||
f.process(tctx, v.filename, data)
|
f.process(tctx, v.filename, data)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue