dodanie kvstore

This commit is contained in:
mateusz779 2024-11-07 11:50:03 +01:00
parent 4fc1b4d5b9
commit f3dadd857c
3 changed files with 308 additions and 27 deletions

156
kvstore/kvstore.go Normal file
View File

@ -0,0 +1,156 @@
package kvstore
import (
"encoding/json"
"os"
"sync"
"time"
)
type DataEntry struct {
Value string `json:"value"`
Timestamp time.Time `json:"timestamp"`
}
type KVStore struct {
data sync.Map // klucz: string, wartość: DataEntry
filePath string
done chan struct{}
}
func NewKVStore(filePath string) (*KVStore, error) {
store := &KVStore{
filePath: filePath,
done: make(chan struct{}),
}
if _, err := os.Stat(filePath); !os.IsNotExist(err) {
if err := store.load(); err != nil {
return nil, err
}
}
go store.periodicSave()
return store, nil
}
// Set zapisuje wartość z automatycznym timestampem
func (kv *KVStore) Set(key string, value string) {
entry := DataEntry{
Value: value,
Timestamp: time.Now(),
}
kv.data.Store(key, entry)
}
// SetWithTimestamp pozwala na ustawienie własnego timestampa
func (kv *KVStore) SetWithTimestamp(key string, value string, timestamp time.Time) {
entry := DataEntry{
Value: value,
Timestamp: timestamp,
}
kv.data.Store(key, entry)
}
// Get zwraca DataEntry dla danego klucza
func (kv *KVStore) Get(key string) (DataEntry, bool) {
if value, ok := kv.data.Load(key); ok {
return value.(DataEntry), true
}
return DataEntry{}, false
}
// GetValue zwraca tylko wartość (string) dla danego klucza
func (kv *KVStore) GetValue(key string) (string, bool) {
if value, ok := kv.data.Load(key); ok {
return value.(DataEntry).Value, true
}
return "", false
}
func (kv *KVStore) Delete(key string) {
kv.data.Delete(key)
}
func (kv *KVStore) periodicSave() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
kv.save()
case <-kv.done:
return
}
}
}
func (kv *KVStore) load() error {
file, err := os.ReadFile(kv.filePath)
if err != nil {
return err
}
var data map[string]DataEntry
if err := json.Unmarshal(file, &data); err != nil {
return err
}
for k, v := range data {
kv.data.Store(k, v)
}
return nil
}
func (kv *KVStore) save() error {
data := make(map[string]DataEntry)
kv.data.Range(func(key, value interface{}) bool {
if k, ok := key.(string); ok {
data[k] = value.(DataEntry)
}
return true
})
file, err := json.Marshal(data)
if err != nil {
return err
}
return os.WriteFile(kv.filePath, file, 0644)
}
func (kv *KVStore) Close() error {
close(kv.done)
return kv.save()
}
// Dodatkowe pomocnicze metody
// GetAll zwraca wszystkie wpisy
func (kv *KVStore) GetAll() map[string]DataEntry {
result := make(map[string]DataEntry)
kv.data.Range(func(key, value interface{}) bool {
if k, ok := key.(string); ok {
result[k] = value.(DataEntry)
}
return true
})
return result
}
// GetEntriesAfter zwraca wpisy po określonej dacie
func (kv *KVStore) GetEntriesAfter(timestamp time.Time) map[string]DataEntry {
result := make(map[string]DataEntry)
kv.data.Range(func(key, value interface{}) bool {
if k, ok := key.(string); ok {
entry := value.(DataEntry)
if entry.Timestamp.After(timestamp) {
result[k] = entry
}
}
return true
})
return result
}

View File

@ -0,0 +1,117 @@
package kvstore
import (
"fmt"
"os"
"strconv"
"sync"
"testing"
)
func BenchmarkKVStore_Set_Parallel(b *testing.B) {
store, err := NewKVStore("benchmark_test.db")
if err != nil {
b.Fatalf("Failed to create KVStore: %v", err)
}
defer os.Remove("benchmark_test.db")
defer store.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
counter := 0
for pb.Next() {
store.Set(fmt.Sprintf("key-%d", counter), strconv.Itoa(counter))
counter++
}
})
}
func BenchmarkKVStore_Get_Parallel(b *testing.B) {
store, err := NewKVStore("benchmark_test.db")
if err != nil {
b.Fatalf("Failed to create KVStore: %v", err)
}
defer os.Remove("benchmark_test.db")
defer store.Close()
// Przygotowanie danych
for i := 0; i < 1000; i++ {
store.Set(fmt.Sprintf("key-%d", i), strconv.Itoa(i))
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
counter := 0
for pb.Next() {
store.Get(fmt.Sprintf("key-%d", counter%1000))
counter++
}
})
}
func BenchmarkKVStore_Mixed_Parallel(b *testing.B) {
store, err := NewKVStore("benchmark_test.db")
if err != nil {
b.Fatalf("Failed to create KVStore: %v", err)
}
defer os.Remove("benchmark_test.db")
defer store.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
counter := 0
for pb.Next() {
key := "key-" + strconv.Itoa(counter)
switch counter % 3 {
case 0:
store.Set(key, strconv.Itoa(counter))
case 1:
store.Get(key)
case 2:
store.Delete(key)
}
counter++
}
})
}
func BenchmarkKVStore_Concurrent_HeavyLoad(b *testing.B) {
store, err := NewKVStore("benchmark_test.db")
if err != nil {
b.Fatalf("Failed to create KVStore: %v", err)
}
defer os.Remove("benchmark_test.db")
defer store.Close()
var wg sync.WaitGroup
workers := 100
opsPerWorker := b.N / workers
b.ResetTimer()
// Writers
for i := 0; i < workers/2; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := 0; j < opsPerWorker; j++ {
key := fmt.Sprintf("key-%d-%d", workerID, j)
store.Set(key, strconv.Itoa(j))
}
}(i)
}
// Readers
for i := workers / 2; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := 0; j < opsPerWorker; j++ {
key := fmt.Sprintf("key-%d-%d", workerID%50, j)
store.Get(key)
}
}(i)
}
wg.Wait()
}

62
main.go
View File

@ -5,24 +5,30 @@ package main
import (
"encoding/json"
"fmt"
"log"
"mkedziora/fast-links/kvstore"
"net/http"
"strconv"
"sync"
"time"
)
type DataEntry struct {
Value string
Timestamp time.Time
}
// var (
// dataMap map[string]DataEntry
// mu sync.Mutex
// )
var (
dataMap map[string]DataEntry
mu sync.Mutex
)
// func init() {
// dataMap = make(map[string]DataEntry)
// }
var store *kvstore.KVStore
func init() {
dataMap = make(map[string]DataEntry)
var err error
store, err = kvstore.NewKVStore("data.db")
if err != nil {
log.Fatalf("Failed to create KVStore: %v", err)
}
}
func setHandler(w http.ResponseWriter, r *http.Request) {
@ -42,7 +48,7 @@ func setHandler(w http.ResponseWriter, r *http.Request) {
return
}
mu.Lock()
// mu.Lock()
if input.ID == "" {
input.ID = "default"
}
@ -64,11 +70,12 @@ func setHandler(w http.ResponseWriter, r *http.Request) {
expirationTime := time.Now().Add(time.Duration(expireSeconds) * time.Second)
dataMap[input.ID] = DataEntry{
Value: input.Data,
Timestamp: expirationTime,
}
mu.Unlock()
store.SetWithTimestamp("key_"+input.ID, input.Data, expirationTime)
// dataMap[input.ID] = DataEntry{
// Value: input.Data,
// Timestamp: expirationTime,
// }
// mu.Unlock()
w.WriteHeader(http.StatusNoContent)
}
@ -79,12 +86,14 @@ func getHandler(w http.ResponseWriter, r *http.Request) {
id = "default"
}
mu.Lock()
entry, exists := dataMap[id]
mu.Unlock()
// mu.Lock()
// entry, exists := dataMap[id]
// mu.Unlock()
entry, exists := store.Get("key_" + id)
if !exists || time.Since(entry.Timestamp) > 30*time.Second {
delete(dataMap, id)
store.Delete("key_" + id)
http.Error(w, "No data available", http.StatusNotFound)
return
}
@ -99,12 +108,9 @@ func getUrlHandler(w http.ResponseWriter, r *http.Request) {
id = "default"
}
mu.Lock()
entry, exists := dataMap[id]
mu.Unlock()
entry, exists := store.Get("key_" + id)
if !exists || time.Since(entry.Timestamp) > 30*time.Second {
delete(dataMap, id)
store.Delete("key_" + id)
http.Error(w, "No data available", http.StatusNotFound)
return
}
@ -114,6 +120,7 @@ func getUrlHandler(w http.ResponseWriter, r *http.Request) {
}
func main() {
// Obsługa endpointów API
http.HandleFunc("/api/set", setHandler)
http.HandleFunc("/api/get", getHandler)
@ -150,9 +157,10 @@ func main() {
go func() {
for {
for id, timestamp := range dataMap {
for id, timestamp := range store.GetAll() {
if time.Now().After(timestamp.Timestamp) {
delete(dataMap, id)
store.Delete(id)
break
}
}