Speichern der Daten in eine MariaDB
This commit is contained in:
parent
b7b20386aa
commit
4c140eb7d2
@ -2,13 +2,12 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/app"
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/database"
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/dataservice"
|
||||
)
|
||||
|
||||
@ -20,8 +19,6 @@ const (
|
||||
)
|
||||
|
||||
func main() {
|
||||
a, _ := os.Getwd()
|
||||
fmt.Printf("%s\n", a)
|
||||
ctx, cancel := signal.NotifyContext(
|
||||
context.Background(),
|
||||
os.Interrupt,
|
||||
@ -30,11 +27,10 @@ func main() {
|
||||
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
app := app.NewApp(&wg)
|
||||
ds := dataservice.NewDataService()
|
||||
ds.AddListener(app)
|
||||
db := database.NewDatabaseWriter(&wg)
|
||||
ds := dataservice.NewDataService(ctx)
|
||||
ds.AddListener(db)
|
||||
wg.Add(ds.ListenerCount())
|
||||
app.Run()
|
||||
ds.Run(ctx, &wg)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
21
docker-compose.yml
Normal file
21
docker-compose.yml
Normal file
@ -0,0 +1,21 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
mariadb:
|
||||
image: mariadb:10.6
|
||||
container_name: mariadb_test
|
||||
ports:
|
||||
- "3306:3306"
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: eliogeheim
|
||||
MYSQL_DATABASE: elio
|
||||
volumes:
|
||||
- mariadb_data:/var/lib/mysql
|
||||
healthcheck:
|
||||
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-p eliogeheim"]
|
||||
interval: 5s
|
||||
timeout: 3s
|
||||
retries: 3
|
||||
|
||||
volumes:
|
||||
mariadb_data: # Named Volume für Datenpersistenz
|
||||
31
erzeugeTestdaten.py
Normal file
31
erzeugeTestdaten.py
Normal file
@ -0,0 +1,31 @@
|
||||
import random
|
||||
import string
|
||||
|
||||
def generate_product_id():
|
||||
"""Generiert eine Produkt-ID im Format A1000, B3009, etc."""
|
||||
letter = random.choice(string.ascii_uppercase)
|
||||
number = random.randint(1000, 9999)
|
||||
return f"{letter}{number}"
|
||||
|
||||
def create_csv_file(filename="products.csv", entries=70000):
|
||||
"""Erstellt CSV-Datei mit angegebener Anzahl Einträge."""
|
||||
print(f"Erstelle CSV-Datei mit {entries} Einträgen...")
|
||||
|
||||
with open(filename, 'w', encoding='utf-8') as file:
|
||||
# Header schreiben
|
||||
file.write("product_id;quantity\n")
|
||||
|
||||
# Datenzeilen generieren
|
||||
for i in range(entries):
|
||||
product_id = generate_product_id()
|
||||
quantity = random.randint(1, 999)
|
||||
file.write(f"{product_id};{quantity}\n")
|
||||
|
||||
# Fortschritt anzeigen alle 100.000 Zeilen
|
||||
if (i + 1) % 100000 == 0:
|
||||
print(f"{i + 1} Zeilen geschrieben...")
|
||||
|
||||
print(f"CSV-Datei '{filename}' mit {entries} Einträgen erfolgreich erstellt!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
create_csv_file()
|
||||
11
go.mod
11
go.mod
@ -3,8 +3,13 @@ module gittea.marcokittel.de/elio/eliotools/datawriter
|
||||
go 1.24.4
|
||||
|
||||
require (
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628155152-bf86ab9c8376
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628161103-9cee287699cf
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628220203-a0ab9a6f347e
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628215830-0e90b68e2239
|
||||
)
|
||||
|
||||
require gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628161103-9cee287699cf
|
||||
require gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629091804-3a0cfb29665c
|
||||
|
||||
require (
|
||||
filippo.io/edwards25519 v1.1.0 // indirect
|
||||
github.com/go-sql-driver/mysql v1.9.3 // indirect
|
||||
)
|
||||
|
||||
36
go.sum
36
go.sum
@ -1,13 +1,47 @@
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
|
||||
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250626215804-b8e72f5652e6 h1:dru/Bcg52eUmNm2qnWnKPmpYB4VjnFPBdhWXARZOqPo=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250626215804-b8e72f5652e6/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628161103-9cee287699cf h1:safOKitxO+dbliwBM40U1/1e1sfupXmxTrPGst4PqFA=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628161103-9cee287699cf/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628215830-0e90b68e2239 h1:E7afhNxudV5Za3OKecweAXoidQtJ6OSOkVfhi/wL9LI=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628215830-0e90b68e2239/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628215928-4c6c751d8747 h1:hBG6rCkBs0nIaQxk7wV1AxIBb9OzqEkC8anXZugM9+g=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628215928-4c6c751d8747/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628220203-a0ab9a6f347e h1:cJLidZosWdAzSMD2/6S36pVHMFJAFFYs/Uz+BcAFSHA=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628220203-a0ab9a6f347e/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628222442-c3826f7f9298 h1:RiQ5BCuwUuZJPet4OVhGWp70gdqjMDu2T9nuoyA9ITM=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250628222442-c3826f7f9298/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629062036-dd6cb28e9aad h1:4aPD0xHD2ikTzt9YBGjjC2PcTiLQe5LVVMfTDDH+7Q0=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629062036-dd6cb28e9aad/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629062800-a8cf840640d4 h1:Q0c7NnzJKe6zIyas0uEBLuZSl7qiYE3DAn0Kkom0FQ0=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629062800-a8cf840640d4/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629065915-5858e70c0d1e h1:BnAEKzW56Rf1cwpdKYA2RFoso/YbjJh8ETXK2zQTZrA=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629065915-5858e70c0d1e/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629070619-17365edbbbd4 h1:VVZjBaSGFC/+tbrCULnkMGoxzhRFay0vug/TzuK81As=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629070619-17365edbbbd4/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629083023-6dadd17a58e0 h1:Wu/tgqPDmk681H7yG1Fi9l3Qv1WLrqlJS5AQLG0PVzc=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629083023-6dadd17a58e0/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629084524-b3367d8e5e5b h1:D0PK63zzzn86s63WIGT2avlTtXOo1D+EQc59e4Hu/Io=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629084524-b3367d8e5e5b/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629084753-6f75f05e0612 h1:/9RS+SbDwKiboAiEgGCb1cNi7wlionTupIfoMEbzCN0=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629084753-6f75f05e0612/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629091158-8b052bc038c5 h1:gsyAx7QWxAAZNuUgS5hS52SSHn67mdHBRlqEZxSg4sw=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629091158-8b052bc038c5/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629091804-3a0cfb29665c h1:W2WElxaKxa4y32IQtVHM6ziS3tnZlCDUMUiMSxHPE60=
|
||||
gittea.marcokittel.de/elio/eliotools/eliofile v0.0.0-20250629091804-3a0cfb29665c/go.mod h1:h9FfBWQD/1+fF3sFaYN89K3xH54t3LHKBKQj8YiAjnU=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250624203334-69cf94bf1eef h1:EzFzLINpiq712X2/t8ZMTLoWuqA6sRmpH3J0VpFi2Cg=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250624203334-69cf94bf1eef/go.mod h1:WDmnG6o72HhGTSkgwj2kXYcRL2MaNFNsKNBeTh6NIzo=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628153134-531bb5ba4145 h1:BgHWRbDKV+lGs3T9HIa8YYy5redu6MGHTcx8es4lQdc=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628153134-531bb5ba4145/go.mod h1:WDmnG6o72HhGTSkgwj2kXYcRL2MaNFNsKNBeTh6NIzo=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628155152-bf86ab9c8376 h1:Mozt8XRXoO7x8t/zAZPl/m4sxNmzpz4/sgtb6KFTU/M=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628155152-bf86ab9c8376/go.mod h1:WDmnG6o72HhGTSkgwj2kXYcRL2MaNFNsKNBeTh6NIzo=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628215830-0e90b68e2239 h1:gPYlG5mcdFfqjEjm9Y0rX3prch7bsAlkHLLNZA/P45g=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628215830-0e90b68e2239/go.mod h1:WDmnG6o72HhGTSkgwj2kXYcRL2MaNFNsKNBeTh6NIzo=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628220203-a0ab9a6f347e h1:tPxm0kVxFHkRRELo7KjrtdppShrAPg/IihYGC3lfjvo=
|
||||
gittea.marcokittel.de/elio/eliotools/logger v0.0.0-20250628220203-a0ab9a6f347e/go.mod h1:WDmnG6o72HhGTSkgwj2kXYcRL2MaNFNsKNBeTh6NIzo=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250626212824-2199e71c62fd h1:NYRlflnOMzwcqEsAMYg79++/jHDNTkF2Rq5ZL0RcrBU=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250626212824-2199e71c62fd/go.mod h1:jJvuXliNOiG9i8VXrY9vK5Bqv9QwDtswCs3CNIIBvUQ=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628130612-3fccbe223735 h1:YGuVoCyj8C5Tv1cgfNuQG5hluGZkbkS2C2sR18r9Jw0=
|
||||
@ -18,3 +52,5 @@ gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628155152-bf86ab9c8376 h1
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628155152-bf86ab9c8376/go.mod h1:jJvuXliNOiG9i8VXrY9vK5Bqv9QwDtswCs3CNIIBvUQ=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628161103-9cee287699cf h1:1D5PVm08fke9YzW2FPzXawehmNs2KG9xyfHXR8tWsGk=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628161103-9cee287699cf/go.mod h1:jJvuXliNOiG9i8VXrY9vK5Bqv9QwDtswCs3CNIIBvUQ=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628215830-0e90b68e2239 h1:ektVJ1MDFTRSAiP4rwd9zX9EcFFlJQMa/XA8m00BjMo=
|
||||
gittea.marcokittel.de/elio/eliotools/tools v0.0.0-20250628215830-0e90b68e2239/go.mod h1:jJvuXliNOiG9i8VXrY9vK5Bqv9QwDtswCs3CNIIBvUQ=
|
||||
|
||||
@ -1,42 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/dbwriter"
|
||||
"gittea.marcokittel.de/elio/eliotools/logger"
|
||||
)
|
||||
|
||||
type App struct {
|
||||
wg *sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
log logger.Logger
|
||||
}
|
||||
|
||||
func NewApp(wg *sync.WaitGroup) *App {
|
||||
a := App{log: logger.NewMarcoLogger(),
|
||||
wg: wg,
|
||||
}
|
||||
return &a
|
||||
}
|
||||
|
||||
func (a *App) HandleData(ctx context.Context, data dbwriter.MyStruct) error {
|
||||
a.log.Info("HandleData")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) AppendData(data dbwriter.MyStruct) error {
|
||||
a.log.Info("Verarbeite Daten: " + data.A + " " + string(data.B))
|
||||
//Todo
|
||||
return nil
|
||||
}
|
||||
func (a *App) Run() {
|
||||
if runtime.GOOS == "windows" {
|
||||
a.log.Fatal("Einfach nein!")
|
||||
}
|
||||
|
||||
a.log.Info("Applikation gestartet")
|
||||
|
||||
}
|
||||
102
internal/database/database.go
Normal file
102
internal/database/database.go
Normal file
@ -0,0 +1,102 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"gittea.marcokittel.de/elio/eliotools/eliofile"
|
||||
"gittea.marcokittel.de/elio/eliotools/logger"
|
||||
)
|
||||
|
||||
type DatabaseWriter struct {
|
||||
wg *sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
log logger.Logger
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func (d *DatabaseWriter) connectDB() (*sql.DB, error) {
|
||||
// Format: "username:password@tcp(host:port)/dbname?params"
|
||||
//
|
||||
db, err := sql.Open("mysql", "root:eliogeheim@tcp(127.0.0.1:3306)/elio?parseTime=true")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Verbindungspool konfigurieren
|
||||
db.SetMaxOpenConns(40)
|
||||
db.SetMaxIdleConns(25)
|
||||
db.SetConnMaxLifetime(5 * time.Minute)
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (d *DatabaseWriter) createTablesIfNotExist() error {
|
||||
|
||||
_, err := d.db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS warehouseproducts (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
warehouse char(2) NOT NULL,
|
||||
productid VARCHAR(20) NOT NULL,
|
||||
amount INT DEFAULT 0,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT location_product_must_be_one UNIQUE (warehouse, productid)
|
||||
) ENGINE=InnoDB;
|
||||
|
||||
`)
|
||||
return err
|
||||
}
|
||||
|
||||
func NewDatabaseWriter(wg *sync.WaitGroup) *DatabaseWriter {
|
||||
db := DatabaseWriter{log: logger.NewMarcoLogger(),
|
||||
wg: wg,
|
||||
}
|
||||
sql, err := db.connectDB()
|
||||
if err != nil {
|
||||
fmt.Printf("Datenbank nicht gefunden. %w", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
db.db = sql
|
||||
err = db.createTablesIfNotExist()
|
||||
if err != nil {
|
||||
fmt.Printf("Datenbank erstellung fehlgeschlagen. %w", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return &db
|
||||
}
|
||||
|
||||
func (d *DatabaseWriter) UpdateOrInsertWarehouseProduct(warehouse string, productID string, amount int) error {
|
||||
_, err := d.db.Exec(`
|
||||
INSERT INTO warehouseproducts (warehouse, productid, amount)
|
||||
VALUES (?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE amount = VALUES(amount)`,
|
||||
warehouse, productID, amount)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("upsert failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DatabaseWriter) HandleData(ctx context.Context, data eliofile.CountryCsvData) error {
|
||||
// a.log.Info("HandleData")
|
||||
if strings.Contains(strings.Join(data.Data, ","), "product_id") {
|
||||
|
||||
return nil
|
||||
}
|
||||
menge := strings.Split(data.Data[0], ";")
|
||||
amount, err := strconv.Atoi(menge[1])
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return err
|
||||
}
|
||||
db.UpdateOrInsertWarehouseProduct(data.CountryID, menge[0], amount)
|
||||
return nil
|
||||
}
|
||||
@ -2,30 +2,59 @@ package dataservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/dbwriter"
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/event"
|
||||
"gittea.marcokittel.de/elio/eliotools/eliofile"
|
||||
"gittea.marcokittel.de/elio/eliotools/logger"
|
||||
"gittea.marcokittel.de/elio/eliotools/tools"
|
||||
)
|
||||
|
||||
func DataServiceDebugHandler(log logger.Logger, processedDir string) func(filename string) bool {
|
||||
return func(filename string) bool {
|
||||
if tools.IsFilenameValid(filename) {
|
||||
t, err := tools.ExtractDateAndConvertToDate(filename)
|
||||
if err != nil {
|
||||
log.Warning(filename + "\n" + err.Error())
|
||||
return false
|
||||
}
|
||||
log.Info("Datei gefunden.: " + filename + " " + t.String() + "\n")
|
||||
return true
|
||||
// Daten über Channel aus der Goroutine ausführen. CounterID stellt sicher, dass das Herkunftsland der Daten bekannt ist.
|
||||
|
||||
// Rückgabe eines Closures zur Verbeitung einer CSV Datei. Posix Only. No Windows.
|
||||
func DataServiceDebugHandler(log logger.Logger, baseDir string, processedDir string) func(ctx context.Context, filename string, data chan<- eliofile.CountryCsvData) bool {
|
||||
return func(ctx context.Context, filename string, data chan<- eliofile.CountryCsvData) bool {
|
||||
log.Infof("%s %s \n", baseDir, processedDir)
|
||||
if !tools.IsFilenameValid(filename) {
|
||||
log.Warning(filename + "Überspringe Datei: " + filename + "\n")
|
||||
return false
|
||||
}
|
||||
return false
|
||||
t, err := tools.ExtractDateAndConvertToDate(filename)
|
||||
if err != nil {
|
||||
log.Warning(filename + "\n" + err.Error())
|
||||
return false
|
||||
}
|
||||
log.Info("Datei gefunden.: " + filename + " " + t.String() + "\n")
|
||||
file, err := os.Open(baseDir + "/" + filename)
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
log.Warning(err.Error())
|
||||
}
|
||||
r := csv.NewReader(file)
|
||||
record, err := r.Read()
|
||||
if err != nil {
|
||||
log.Critical(err.Error())
|
||||
return false
|
||||
}
|
||||
for {
|
||||
record, err = r.Read()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
data <- eliofile.CountryCsvData{CountryID: filename[26:28], Data: record}
|
||||
}
|
||||
err = os.Rename(baseDir+"/"+filename, processedDir+"/"+filename)
|
||||
if err != nil {
|
||||
log.Critical(err.Error())
|
||||
}
|
||||
log.Infof("%s abgearbeitet.\n", filename)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,17 +76,32 @@ func (d *DataService) Run(ctx context.Context, wg *sync.WaitGroup) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("Ende")
|
||||
return
|
||||
default:
|
||||
//Hier nach neuen Dateien suchen
|
||||
for _, l := range d.listener {
|
||||
d.ef.ScanCsv()
|
||||
err := l.HandleData(ctx, dbwriter.MyStruct{A: "dkl", B: 3})
|
||||
if err != nil {
|
||||
//todo
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
data := make(chan eliofile.CountryCsvData, 100)
|
||||
go func() {
|
||||
defer close(data)
|
||||
err := d.ef.ScanCsv(ctx, data)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
for warehouseDataItem := range data {
|
||||
err := l.HandleData(ctx, warehouseDataItem)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -92,7 +136,7 @@ func WithCustomDirectoryPermissions(permissions int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func NewDataService(options ...Option) *DataService {
|
||||
func NewDataService(ctx context.Context, options ...Option) *DataService {
|
||||
lookUpFolderPath, err := os.Getwd()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@ -107,7 +151,7 @@ func NewDataService(options ...Option) *DataService {
|
||||
processedFolderPath: lookUpFolderPath + processedFolderPath,
|
||||
dirCreationRights: dirCreationRights}
|
||||
|
||||
ds.ef = *eliofile.NewElioDateiFoo(ds.newFolderPath, DataServiceDebugHandler(l, ds.processedFolderPath))
|
||||
ds.ef = *eliofile.NewElioDateiFoo(ds.newFolderPath, DataServiceDebugHandler(l, ds.newFolderPath, ds.processedFolderPath))
|
||||
|
||||
for _, opt := range options {
|
||||
opt(&ds)
|
||||
|
||||
@ -1,19 +0,0 @@
|
||||
package dbwriter
|
||||
|
||||
import "fmt"
|
||||
|
||||
type MyStruct struct {
|
||||
A string
|
||||
B int
|
||||
}
|
||||
|
||||
type DBWriter interface {
|
||||
write(data MyStruct)
|
||||
}
|
||||
|
||||
type DBWrite struct {
|
||||
}
|
||||
|
||||
func (d *DBWrite) write(data MyStruct) {
|
||||
fmt.Println("\n\n DB WRITER \n\n")
|
||||
}
|
||||
@ -3,9 +3,9 @@ package event
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gittea.marcokittel.de/elio/eliotools/datawriter/internal/dbwriter"
|
||||
"gittea.marcokittel.de/elio/eliotools/eliofile"
|
||||
)
|
||||
|
||||
type EventListener interface {
|
||||
HandleData(ctx context.Context, data dbwriter.MyStruct) error
|
||||
HandleData(ctx context.Context, data eliofile.CountryCsvData) error
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user