Skip to content

Commit 241531f

Browse files
mergify[bot]efd6
andauthored
[9.0](backport #43063) filebeat,libbeat,x-pack/filebeat: clean up state store types (#43366)
* filebeat,libbeat,x-pack/filebeat: clean up state store types (#43063) This centralises the state store to simplify and clarify the use of persistent stores. (cherry picked from commit df62e10) # Conflicts: # filebeat/beater/filebeat.go # filebeat/input/journald/environment_test.go * fix conflicts --------- Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
1 parent 0105813 commit 241531f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+152
-143
lines changed

Diff for: filebeat/beater/filebeat.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"path/filepath"
2525
"strings"
2626
"sync"
27-
"time"
2827

2928
"github.com/elastic/beats/v7/filebeat/backup"
3029
"github.com/elastic/beats/v7/filebeat/channel"
@@ -79,14 +78,7 @@ type Filebeat struct {
7978
pipeline beat.PipelineConnector
8079
}
8180

82-
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin
83-
84-
type StateStore interface {
85-
// Access returns the storage registry depending on the type. This is needed for the Elasticsearch state store which
86-
// is guarded by the feature.IsElasticsearchStateStoreEnabledForInput(typ) check.
87-
Access(typ string) (*statestore.Store, error)
88-
CleanupInterval() time.Duration
89-
}
81+
type PluginFactory func(beat.Info, *logp.Logger, statestore.States) []v2.Plugin
9082

9183
// New creates a new Filebeat pointer instance.
9284
func New(plugins PluginFactory) beat.Creator {
@@ -568,7 +560,7 @@ func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.Pip
568560

569561
// some of the filestreams might want to take over the loginput state
570562
// if their `take_over` flag is set to `true`.
571-
func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
563+
func processLogInputTakeOver(stateStore statestore.States, config *cfg.Config) error {
572564
inputs, err := fetchInputConfiguration(config)
573565
if err != nil {
574566
return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err)
@@ -577,7 +569,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
577569
return nil
578570
}
579571

580-
store, err := stateStore.Access("")
572+
store, err := stateStore.StoreFor("")
581573
if err != nil {
582574
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
583575
}

Diff for: filebeat/beater/store.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import (
3232
"github.com/elastic/elastic-agent-libs/paths"
3333
)
3434

35+
var _ statestore.States = (*filebeatStore)(nil)
36+
3537
type filebeatStore struct {
3638
registry *statestore.Registry
3739
esRegistry *statestore.Registry
@@ -84,8 +86,8 @@ func (s *filebeatStore) Close() {
8486
s.registry.Close()
8587
}
8688

87-
// Access returns the storage registry depending on the type. Default is the file store.
88-
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
89+
// StoreFor returns the storage registry depending on the type. Default is the file store.
90+
func (s *filebeatStore) StoreFor(typ string) (*statestore.Store, error) {
8991
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
9092
return s.esRegistry.Get(s.storeName)
9193
}

Diff for: filebeat/input/default-inputs/inputs.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,25 @@
1818
package inputs
1919

2020
import (
21-
"github.com/elastic/beats/v7/filebeat/beater"
2221
"github.com/elastic/beats/v7/filebeat/input/filestream"
2322
"github.com/elastic/beats/v7/filebeat/input/kafka"
2423
"github.com/elastic/beats/v7/filebeat/input/tcp"
2524
"github.com/elastic/beats/v7/filebeat/input/udp"
2625
"github.com/elastic/beats/v7/filebeat/input/unix"
2726
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
2827
"github.com/elastic/beats/v7/libbeat/beat"
28+
"github.com/elastic/beats/v7/libbeat/statestore"
2929
"github.com/elastic/elastic-agent-libs/logp"
3030
)
3131

32-
func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.Plugin {
32+
func Init(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin {
3333
return append(
3434
genericInputs(log, components),
3535
osInputs(info, log, components)...,
3636
)
3737
}
3838

39-
func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin {
39+
func genericInputs(log *logp.Logger, components statestore.States) []v2.Plugin {
4040
return []v2.Plugin{
4141
filestream.Plugin(log, components),
4242
kafka.Plugin(),

Diff for: filebeat/input/default-inputs/inputs_linux.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,14 @@ package inputs
2020
import (
2121
"github.com/elastic/beats/v7/filebeat/input/journald"
2222
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
23-
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
2423
"github.com/elastic/beats/v7/libbeat/beat"
24+
"github.com/elastic/beats/v7/libbeat/statestore"
2525
"github.com/elastic/elastic-agent-libs/logp"
2626
)
2727

2828
// inputs that are only supported on linux
2929

30-
type osComponents interface {
31-
cursor.StateStore
32-
}
33-
34-
func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
30+
func osInputs(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin {
3531
var plugins []v2.Plugin
3632

3733
zeroPlugin := v2.Plugin{}

Diff for: filebeat/input/default-inputs/inputs_windows.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@ package inputs
1919

2020
import (
2121
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
22-
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
2322
"github.com/elastic/beats/v7/filebeat/input/winlog"
2423
"github.com/elastic/beats/v7/libbeat/beat"
24+
"github.com/elastic/beats/v7/libbeat/statestore"
2525
"github.com/elastic/elastic-agent-libs/logp"
2626
)
2727

28-
type osComponents interface {
29-
cursor.StateStore
30-
}
31-
32-
func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
28+
func osInputs(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin {
3329
return []v2.Plugin{
3430
winlog.Plugin(log, components),
3531
}

Diff for: filebeat/input/filestream/environment_test.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import (
4848
type inputTestingEnvironment struct {
4949
t *testing.T
5050
workingDir string
51-
stateStore loginp.StateStore
51+
stateStore statestore.States
5252
pipeline *mockPipelineConnector
5353

5454
pluginInitOnce sync.Once
@@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string {
194194
}
195195

196196
func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
197-
inputStore, _ := e.stateStore.Access("")
197+
inputStore, _ := e.stateStore.StoreFor("")
198198

199199
actual := 0
200200
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
@@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
331331
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
332332
}
333333

334-
inputStore, _ := e.stateStore.Access("")
334+
inputStore, _ := e.stateStore.StoreFor("")
335335
id := getIDFromPath(filepath, inputID, fi)
336336

337337
var entry registryEntry
@@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect
352352
}
353353

354354
func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
355-
inputStore, _ := e.stateStore.Access("")
355+
inputStore, _ := e.stateStore.StoreFor("")
356356

357357
var entry registryEntry
358358
err := inputStore.Get(key, &entry)
@@ -539,11 +539,13 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
539539
require.True(e.t, selectedEvent.Timestamp.Equal(tm), "got: %s, expected: %s", selectedEvent.Timestamp.String(), tm.String())
540540
}
541541

542+
var _ statestore.States = (*testInputStore)(nil)
543+
542544
type testInputStore struct {
543545
registry *statestore.Registry
544546
}
545547

546-
func openTestStatestore() loginp.StateStore {
548+
func openTestStatestore() statestore.States {
547549
return &testInputStore{
548550
registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()),
549551
}
@@ -553,7 +555,7 @@ func (s *testInputStore) Close() {
553555
s.registry.Close()
554556
}
555557

556-
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
558+
func (s *testInputStore) StoreFor(string) (*statestore.Store, error) {
557559
return s.registry.Get("filebeat")
558560
}
559561

Diff for: filebeat/input/filestream/input.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/elastic/beats/v7/libbeat/reader/parser"
4141
"github.com/elastic/beats/v7/libbeat/reader/readfile"
4242
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
43+
"github.com/elastic/beats/v7/libbeat/statestore"
4344
conf "github.com/elastic/elastic-agent-libs/config"
4445
"github.com/elastic/elastic-agent-libs/logp"
4546
"github.com/elastic/elastic-agent-libs/mapstr"
@@ -67,7 +68,7 @@ type filestream struct {
6768
}
6869

6970
// Plugin creates a new filestream input plugin for creating a stateful input.
70-
func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin {
71+
func Plugin(log *logp.Logger, store statestore.States) input.Plugin {
7172
return input.Plugin{
7273
Name: pluginName,
7374
Stability: feature.Stable,

Diff for: filebeat/input/filestream/input_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828

2929
"github.com/stretchr/testify/require"
3030

31-
loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
3231
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3332
"github.com/elastic/beats/v7/libbeat/beat"
3433
"github.com/elastic/beats/v7/libbeat/statestore"
@@ -235,10 +234,12 @@ func generateFile(t testing.TB, dir string, lineCount int) string {
235234
return filename
236235
}
237236

238-
func createTestStore(t testing.TB) loginp.StateStore {
237+
func createTestStore(t testing.TB) statestore.States {
239238
return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())}
240239
}
241240

241+
var _ statestore.States = (*testStore)(nil)
242+
242243
type testStore struct {
243244
registry *statestore.Registry
244245
}
@@ -247,7 +248,7 @@ func (s *testStore) Close() {
247248
s.registry.Close()
248249
}
249250

250-
func (s *testStore) Access(_ string) (*statestore.Store, error) {
251+
func (s *testStore) StoreFor(string) (*statestore.Store, error) {
251252
return s.registry.Get("filestream-benchmark")
252253
}
253254

Diff for: filebeat/input/filestream/internal/input-logfile/manager.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type InputManager struct {
5151
Logger *logp.Logger
5252

5353
// StateStore gives the InputManager access to the persistent key value store.
54-
StateStore StateStore
54+
StateStore statestore.States
5555

5656
// Type must contain the name of the input type. It is used to create the key name
5757
// for all sources the inputs collect from.
@@ -87,12 +87,6 @@ var errNoInputRunner = errors.New("no input runner available")
8787
// Deprecated: Inputs without an ID are not supported anymore.
8888
const globalInputID = ".global"
8989

90-
// StateStore interface and configurations used to give the Manager access to the persistent store.
91-
type StateStore interface {
92-
Access(typ string) (*statestore.Store, error)
93-
CleanupInterval() time.Duration
94-
}
95-
9690
func (cim *InputManager) init() error {
9791
cim.initOnce.Do(func() {
9892

Diff for: filebeat/input/filestream/internal/input-logfile/store.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,10 @@ type (
141141
// hook into store close for testing purposes
142142
var closeStore = (*store).close
143143

144-
func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
144+
func openStore(log *logp.Logger, statestore statestore.States, prefix string) (*store, error) {
145145
ok := false
146146

147-
persistentStore, err := statestore.Access("")
147+
persistentStore, err := statestore.StoreFor("")
148148
if err != nil {
149149
return nil, err
150150
}

Diff for: filebeat/input/filestream/internal/input-logfile/store_test.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,6 @@ import (
3535
"github.com/elastic/go-concert/unison"
3636
)
3737

38-
type testStateStore struct {
39-
Store *statestore.Store
40-
GCPeriod time.Duration
41-
}
42-
4338
func TestResource_CopyInto(t *testing.T) {
4439
src := resource{lock: unison.MakeMutex()}
4540
dst := resource{lock: unison.MakeMutex()}
@@ -476,7 +471,7 @@ func closeStoreWith(fn func(s *store)) func() {
476471
}
477472

478473
//nolint:unparam // It's a test helper
479-
func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store {
474+
func testOpenStore(t *testing.T, prefix string, persistentStore statestore.States) *store {
480475
if persistentStore == nil {
481476
persistentStore = createSampleStore(t, nil)
482477
}
@@ -506,9 +501,16 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {
506501
}
507502
}
508503

504+
var _ statestore.States = testStateStore{}
505+
506+
type testStateStore struct {
507+
Store *statestore.Store
508+
GCPeriod time.Duration
509+
}
510+
509511
func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
510512
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
511-
func (ts testStateStore) Access(string) (*statestore.Store, error) {
513+
func (ts testStateStore) StoreFor(string) (*statestore.Store, error) {
512514
if ts.Store == nil {
513515
return nil, errors.New("no store configured")
514516
}

Diff for: filebeat/input/journald/environment_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
125125
}, 5*time.Second, 10*time.Millisecond, &msg)
126126
}
127127

128+
var _ statestore.States = (*testInputStore)(nil)
129+
128130
type testInputStore struct {
129131
registry *statestore.Registry
130132
}
@@ -139,7 +141,7 @@ func (s *testInputStore) Close() {
139141
s.registry.Close()
140142
}
141143

142-
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
144+
func (s *testInputStore) StoreFor(string) (*statestore.Store, error) {
143145
return s.registry.Get("filebeat")
144146
}
145147

Diff for: filebeat/input/journald/input.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/elastic/beats/v7/libbeat/feature"
3232
"github.com/elastic/beats/v7/libbeat/reader"
3333
"github.com/elastic/beats/v7/libbeat/reader/parser"
34+
"github.com/elastic/beats/v7/libbeat/statestore"
3435
conf "github.com/elastic/elastic-agent-libs/config"
3536
"github.com/elastic/elastic-agent-libs/logp"
3637
)
@@ -70,7 +71,7 @@ const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL"
7071
const pluginName = "journald"
7172

7273
// Plugin creates a new journald input plugin for creating a stateful input.
73-
func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin {
74+
func Plugin(log *logp.Logger, store statestore.States) input.Plugin {
7475
return input.Plugin{
7576
Name: pluginName,
7677
Stability: feature.Stable,

Diff for: filebeat/input/journald/input_filtering_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) {
256256
env := newInputTestingEnvironment(t)
257257

258258
if testCase.cursor != "" {
259-
store, _ := env.stateStore.Access("")
259+
store, _ := env.stateStore.StoreFor("")
260260
tmp := map[string]any{}
261261
if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil {
262262
t.Fatal(err)

Diff for: filebeat/input/journald/input_stub.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ package journald
2121

2222
import (
2323
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
24-
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
24+
"github.com/elastic/beats/v7/libbeat/statestore"
2525
"github.com/elastic/elastic-agent-libs/logp"
2626
)
2727

28-
func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin {
28+
func Plugin(log *logp.Logger, store statestore.States) v2.Plugin {
2929
return v2.Plugin{}
3030
}

Diff for: filebeat/input/v2/input-cursor/manager.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ import (
4848
type InputManager struct {
4949
Logger *logp.Logger
5050

51-
// StateStore gives the InputManager access to the persitent key value store.
52-
StateStore StateStore
51+
// StateStore gives the InputManager access to the persistent key value store.
52+
StateStore statestore.States
5353

5454
// Type must contain the name of the input type. It is used to create the key name
5555
// for all sources the inputs collect from.
@@ -80,12 +80,6 @@ var (
8080
errNoInputRunner = errors.New("no input runner available")
8181
)
8282

83-
// StateStore interface and configurations used to give the Manager access to the persistent store.
84-
type StateStore interface {
85-
Access(typ string) (*statestore.Store, error)
86-
CleanupInterval() time.Duration
87-
}
88-
8983
// init initializes the state store
9084
// This function is called from:
9185
// 1. InputManager::Init on beat start

0 commit comments

Comments
 (0)