186 lines
5.0 KiB
Go
186 lines
5.0 KiB
Go
package dataservice
|
|
|
|
import (
|
|
"context"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"io/fs"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/event"
|
|
"gittea.marcokittel.de/elio/eliotools/eliofile"
|
|
"gittea.marcokittel.de/elio/eliotools/logger"
|
|
"gittea.marcokittel.de/elio/eliotools/tools"
|
|
)
|
|
|
|
// Daten über Channel aus der Goroutine ausführen. CounterID stellt sicher, dass das Herkunftsland der Daten bekannt ist.
|
|
|
|
// Rückgabe eines Closures zur Verbeitung einer CSV Datei. Posix Only. No Windows.
|
|
func DataServiceDebugHandler(log logger.Logger, baseDir string, processedDir string) func(ctx context.Context, filename string, data chan<- eliofile.CountryCsvData) bool {
|
|
|
|
assignDataType := func(csvType *eliofile.CountryCsvData, filename string) {
|
|
if strings.Contains(filename, "stock") {
|
|
csvType.Type = "stock"
|
|
} else if strings.Contains(filename, "delivery") {
|
|
csvType.Type = "delivery"
|
|
}
|
|
}
|
|
return func(ctx context.Context, filename string, data chan<- eliofile.CountryCsvData) bool {
|
|
log.Infof("%s %s \n", baseDir, processedDir)
|
|
if !tools.IsFilenameValid(filename) {
|
|
log.Warning(filename + "Überspringe Datei: " + filename + "\n")
|
|
return false
|
|
}
|
|
t, err := tools.ExtractDateAndConvertToDate(filename)
|
|
if err != nil {
|
|
log.Warning(filename + "\n" + err.Error())
|
|
return false
|
|
}
|
|
log.Info("Datei gefunden.: " + filename + " " + t.String() + "\n")
|
|
file, err := os.Open(baseDir + "/" + filename)
|
|
defer file.Close()
|
|
if err != nil {
|
|
log.Warning(err.Error())
|
|
}
|
|
r := csv.NewReader(file)
|
|
record, err := r.Read()
|
|
if err != nil {
|
|
log.Critical(err.Error())
|
|
return false
|
|
}
|
|
for {
|
|
record, err = r.Read()
|
|
if err != nil {
|
|
break
|
|
}
|
|
//Todo: In Tools Auslagern
|
|
entity := eliofile.CountryCsvData{CountryID: filename[26:28], Data: record}
|
|
assignDataType(&entity, filename)
|
|
data <- entity
|
|
|
|
}
|
|
err = os.Rename(baseDir+"/"+filename, processedDir+"/"+filename)
|
|
if err != nil {
|
|
log.Critical(err.Error())
|
|
}
|
|
log.Infof("%s abgearbeitet.\n", filename)
|
|
return true
|
|
}
|
|
}
|
|
|
|
type DataService struct {
|
|
listener []event.EventListener
|
|
log logger.Logger
|
|
ef eliofile.ElioDateiFoo
|
|
newFolderPath string
|
|
processedFolderPath string
|
|
dirCreationRights int
|
|
}
|
|
|
|
func (d *DataService) ListenerCount() int {
|
|
return len(d.listener)
|
|
}
|
|
|
|
func (d *DataService) Run(ctx context.Context, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
fmt.Println("Ende")
|
|
return
|
|
default:
|
|
//Hier nach neuen Dateien suchen
|
|
for _, l := range d.listener {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
data := make(chan eliofile.CountryCsvData, 1)
|
|
go func() {
|
|
defer close(data)
|
|
err := d.ef.ScanCsv(ctx, data)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
}()
|
|
for warehouseDataItem := range data {
|
|
err := l.HandleData(ctx, warehouseDataItem)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
}
|
|
}
|
|
time.Sleep(time.Second * 5)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *DataService) AddListener(ev event.EventListener) {
|
|
d.listener = append(d.listener, ev)
|
|
}
|
|
|
|
type Option func(*DataService)
|
|
|
|
func WithCustomCsvProcesser(process eliofile.ElioHandleFunc) Option {
|
|
return func(ds *DataService) {
|
|
ds.ef = *eliofile.NewElioDateiFoo(ds.newFolderPath, process)
|
|
}
|
|
}
|
|
|
|
// func WithCustomLookupDirectory(lookupDirectory string) Option {
|
|
// return func(ds *DataService) {
|
|
// ds.newFolderPath = lookupDirectory
|
|
// }
|
|
// }
|
|
|
|
// func WithCustomProcessedDirectory(processedDirectory string) Option {
|
|
// return func(ds *DataService) {
|
|
// ds.processedFolderPath = processedDirectory
|
|
// }
|
|
// }
|
|
|
|
func WithCustomDirectoryPermissions(permissions int) Option {
|
|
return func(ds *DataService) {
|
|
ds.dirCreationRights = permissions
|
|
}
|
|
}
|
|
|
|
func NewDataService(ctx context.Context, options ...Option) *DataService {
|
|
lookUpFolderPath, err := os.Getwd()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
newFolderPath := "/new"
|
|
processedFolderPath := "/processed"
|
|
dirCreationRights := 0755
|
|
l := logger.NewMarcoLogger()
|
|
ds := DataService{listener: make([]event.EventListener, 0),
|
|
log: l,
|
|
newFolderPath: lookUpFolderPath + newFolderPath,
|
|
processedFolderPath: lookUpFolderPath + processedFolderPath,
|
|
dirCreationRights: dirCreationRights}
|
|
|
|
ds.ef = *eliofile.NewElioDateiFoo(ds.newFolderPath, DataServiceDebugHandler(l, ds.newFolderPath, ds.processedFolderPath))
|
|
|
|
for _, opt := range options {
|
|
opt(&ds)
|
|
}
|
|
|
|
l.Infof("Suche CSV Dateien in %s.\n Verarbeitete Dateien gelangen in %s.\n", ds.newFolderPath, ds.processedFolderPath)
|
|
exists, err := tools.CheckDir(ds.newFolderPath)
|
|
if err == nil && !exists {
|
|
l.Infof("Erzeuge Verzeichnis: %s\n", ds.newFolderPath)
|
|
tools.Createdir(ds.newFolderPath, fs.FileMode(ds.dirCreationRights))
|
|
}
|
|
exists, err = tools.CheckDir(ds.processedFolderPath)
|
|
if err == nil && !exists {
|
|
l.Infof("Erzeuge Verzeichnis: %s\n", ds.processedFolderPath)
|
|
tools.Createdir(ds.processedFolderPath, fs.FileMode(ds.dirCreationRights))
|
|
}
|
|
return &ds
|
|
}
|