From f3dadd857ccdd8c25141974eeefbbf2af7f0a127 Mon Sep 17 00:00:00 2001 From: mateusz779 Date: Thu, 7 Nov 2024 11:50:03 +0100 Subject: [PATCH] dodanie kvstore --- kvstore/kvstore.go | 156 ++++++++++++++++++++++++++++++++++ kvstore/kvstore_bench_test.go | 117 +++++++++++++++++++++++++ main.go | 62 ++++++++------ 3 files changed, 308 insertions(+), 27 deletions(-) create mode 100644 kvstore/kvstore.go create mode 100644 kvstore/kvstore_bench_test.go diff --git a/kvstore/kvstore.go b/kvstore/kvstore.go new file mode 100644 index 0000000..532ac5e --- /dev/null +++ b/kvstore/kvstore.go @@ -0,0 +1,156 @@ +package kvstore + +import ( + "encoding/json" + "os" + "sync" + "time" +) + +type DataEntry struct { + Value string `json:"value"` + Timestamp time.Time `json:"timestamp"` +} + +type KVStore struct { + data sync.Map // klucz: string, wartość: DataEntry + filePath string + done chan struct{} +} + +func NewKVStore(filePath string) (*KVStore, error) { + store := &KVStore{ + filePath: filePath, + done: make(chan struct{}), + } + + if _, err := os.Stat(filePath); !os.IsNotExist(err) { + if err := store.load(); err != nil { + return nil, err + } + } + + go store.periodicSave() + return store, nil +} + +// Set zapisuje wartość z automatycznym timestampem +func (kv *KVStore) Set(key string, value string) { + entry := DataEntry{ + Value: value, + Timestamp: time.Now(), + } + kv.data.Store(key, entry) +} + +// SetWithTimestamp pozwala na ustawienie własnego timestampa +func (kv *KVStore) SetWithTimestamp(key string, value string, timestamp time.Time) { + entry := DataEntry{ + Value: value, + Timestamp: timestamp, + } + kv.data.Store(key, entry) +} + +// Get zwraca DataEntry dla danego klucza +func (kv *KVStore) Get(key string) (DataEntry, bool) { + if value, ok := kv.data.Load(key); ok { + return value.(DataEntry), true + } + return DataEntry{}, false +} + +// GetValue zwraca tylko wartość (string) dla danego klucza +func (kv *KVStore) GetValue(key string) (string, bool) { + if value, ok := kv.data.Load(key); ok { + return value.(DataEntry).Value, true + } + return "", false +} + +func (kv *KVStore) Delete(key string) { + kv.data.Delete(key) +} + +func (kv *KVStore) periodicSave() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + kv.save() + case <-kv.done: + return + } + } +} + +func (kv *KVStore) load() error { + file, err := os.ReadFile(kv.filePath) + if err != nil { + return err + } + + var data map[string]DataEntry + if err := json.Unmarshal(file, &data); err != nil { + return err + } + + for k, v := range data { + kv.data.Store(k, v) + } + + return nil +} + +func (kv *KVStore) save() error { + data := make(map[string]DataEntry) + kv.data.Range(func(key, value interface{}) bool { + if k, ok := key.(string); ok { + data[k] = value.(DataEntry) + } + return true + }) + + file, err := json.Marshal(data) + if err != nil { + return err + } + + return os.WriteFile(kv.filePath, file, 0644) +} + +func (kv *KVStore) Close() error { + close(kv.done) + return kv.save() +} + +// Dodatkowe pomocnicze metody + +// GetAll zwraca wszystkie wpisy +func (kv *KVStore) GetAll() map[string]DataEntry { + result := make(map[string]DataEntry) + kv.data.Range(func(key, value interface{}) bool { + if k, ok := key.(string); ok { + result[k] = value.(DataEntry) + } + return true + }) + return result +} + +// GetEntriesAfter zwraca wpisy po określonej dacie +func (kv *KVStore) GetEntriesAfter(timestamp time.Time) map[string]DataEntry { + result := make(map[string]DataEntry) + kv.data.Range(func(key, value interface{}) bool { + if k, ok := key.(string); ok { + entry := value.(DataEntry) + if entry.Timestamp.After(timestamp) { + result[k] = entry + } + } + return true + }) + return result +} diff --git a/kvstore/kvstore_bench_test.go b/kvstore/kvstore_bench_test.go new file mode 100644 index 0000000..3ca5299 --- /dev/null +++ b/kvstore/kvstore_bench_test.go @@ -0,0 +1,117 @@ +package kvstore + +import ( + "fmt" + "os" + "strconv" + "sync" + "testing" +) + +func BenchmarkKVStore_Set_Parallel(b *testing.B) { + store, err := NewKVStore("benchmark_test.db") + if err != nil { + b.Fatalf("Failed to create KVStore: %v", err) + } + defer os.Remove("benchmark_test.db") + defer store.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + counter := 0 + for pb.Next() { + store.Set(fmt.Sprintf("key-%d", counter), strconv.Itoa(counter)) + counter++ + } + }) +} + +func BenchmarkKVStore_Get_Parallel(b *testing.B) { + store, err := NewKVStore("benchmark_test.db") + if err != nil { + b.Fatalf("Failed to create KVStore: %v", err) + } + defer os.Remove("benchmark_test.db") + defer store.Close() + + // Przygotowanie danych + for i := 0; i < 1000; i++ { + store.Set(fmt.Sprintf("key-%d", i), strconv.Itoa(i)) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + counter := 0 + for pb.Next() { + store.Get(fmt.Sprintf("key-%d", counter%1000)) + counter++ + } + }) +} + +func BenchmarkKVStore_Mixed_Parallel(b *testing.B) { + store, err := NewKVStore("benchmark_test.db") + if err != nil { + b.Fatalf("Failed to create KVStore: %v", err) + } + defer os.Remove("benchmark_test.db") + defer store.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + counter := 0 + for pb.Next() { + key := "key-" + strconv.Itoa(counter) + switch counter % 3 { + case 0: + store.Set(key, strconv.Itoa(counter)) + case 1: + store.Get(key) + case 2: + store.Delete(key) + } + counter++ + } + }) +} + +func BenchmarkKVStore_Concurrent_HeavyLoad(b *testing.B) { + store, err := NewKVStore("benchmark_test.db") + if err != nil { + b.Fatalf("Failed to create KVStore: %v", err) + } + defer os.Remove("benchmark_test.db") + defer store.Close() + + var wg sync.WaitGroup + workers := 100 + opsPerWorker := b.N / workers + + b.ResetTimer() + + // Writers + for i := 0; i < workers/2; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < opsPerWorker; j++ { + key := fmt.Sprintf("key-%d-%d", workerID, j) + store.Set(key, strconv.Itoa(j)) + } + }(i) + } + + // Readers + for i := workers / 2; i < workers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < opsPerWorker; j++ { + key := fmt.Sprintf("key-%d-%d", workerID%50, j) + store.Get(key) + } + }(i) + } + + wg.Wait() +} diff --git a/main.go b/main.go index c674cb9..2ff596c 100644 --- a/main.go +++ b/main.go @@ -5,24 +5,30 @@ package main import ( "encoding/json" "fmt" + "log" + "mkedziora/fast-links/kvstore" "net/http" "strconv" - "sync" "time" ) -type DataEntry struct { - Value string - Timestamp time.Time -} +// var ( +// dataMap map[string]DataEntry +// mu sync.Mutex +// ) -var ( - dataMap map[string]DataEntry - mu sync.Mutex -) +// func init() { +// dataMap = make(map[string]DataEntry) +// } + +var store *kvstore.KVStore func init() { - dataMap = make(map[string]DataEntry) + var err error + store, err = kvstore.NewKVStore("data.db") + if err != nil { + log.Fatalf("Failed to create KVStore: %v", err) + } } func setHandler(w http.ResponseWriter, r *http.Request) { @@ -42,7 +48,7 @@ func setHandler(w http.ResponseWriter, r *http.Request) { return } - mu.Lock() + // mu.Lock() if input.ID == "" { input.ID = "default" } @@ -64,11 +70,12 @@ func setHandler(w http.ResponseWriter, r *http.Request) { expirationTime := time.Now().Add(time.Duration(expireSeconds) * time.Second) - dataMap[input.ID] = DataEntry{ - Value: input.Data, - Timestamp: expirationTime, - } - mu.Unlock() + store.SetWithTimestamp("key_"+input.ID, input.Data, expirationTime) + // dataMap[input.ID] = DataEntry{ + // Value: input.Data, + // Timestamp: expirationTime, + // } + // mu.Unlock() w.WriteHeader(http.StatusNoContent) } @@ -79,12 +86,14 @@ func getHandler(w http.ResponseWriter, r *http.Request) { id = "default" } - mu.Lock() - entry, exists := dataMap[id] - mu.Unlock() + // mu.Lock() + // entry, exists := dataMap[id] + // mu.Unlock() + + entry, exists := store.Get("key_" + id) if !exists || time.Since(entry.Timestamp) > 30*time.Second { - delete(dataMap, id) + store.Delete("key_" + id) http.Error(w, "No data available", http.StatusNotFound) return } @@ -99,12 +108,9 @@ func getUrlHandler(w http.ResponseWriter, r *http.Request) { id = "default" } - mu.Lock() - entry, exists := dataMap[id] - mu.Unlock() - + entry, exists := store.Get("key_" + id) if !exists || time.Since(entry.Timestamp) > 30*time.Second { - delete(dataMap, id) + store.Delete("key_" + id) http.Error(w, "No data available", http.StatusNotFound) return } @@ -114,6 +120,7 @@ func getUrlHandler(w http.ResponseWriter, r *http.Request) { } func main() { + // Obsługa endpointów API http.HandleFunc("/api/set", setHandler) http.HandleFunc("/api/get", getHandler) @@ -150,9 +157,10 @@ func main() { go func() { for { - for id, timestamp := range dataMap { + + for id, timestamp := range store.GetAll() { if time.Now().After(timestamp.Timestamp) { - delete(dataMap, id) + store.Delete(id) break } }