Skip to content

Commit 6758a19

Browse files
refactor: Making the collectors concurrent (#1099)
* refactor: Making different collectors async and renaming file * refactor: Taking language, source and target framework as input and doing validation. * refactor: Detecting programming language and framework from the source code. 2. If target framework is missing, fallback to source framework. * refactor: Using asyncrunner from utils to make concurrent calls. * refactor: adding logs.
1 parent d2231ff commit 6758a19

File tree

6 files changed

+257
-85
lines changed

6 files changed

+257
-85
lines changed

assessment/assessment_engine.go

+54-14
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"context"
1919
"fmt"
2020
"strings"
21+
"sync"
2122

2223
assessment "github.com/GoogleCloudPlatform/spanner-migration-tool/assessment/collectors"
2324
"github.com/GoogleCloudPlatform/spanner-migration-tool/assessment/sources/mysql"
2425
"github.com/GoogleCloudPlatform/spanner-migration-tool/assessment/utils"
26+
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/task"
2527
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
2628
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
2729
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
@@ -35,6 +37,11 @@ type assessmentCollectors struct {
3537
appAssessmentCollector *assessment.MigrationCodeSummarizer
3638
}
3739

40+
type assessmentTaskInput struct {
41+
taskName string
42+
taskFunc func(ctx context.Context, c assessmentCollectors) (utils.AssessmentOutput, error)
43+
}
44+
3845
func PerformAssessment(conv *internal.Conv, sourceProfile profiles.SourceProfile, assessmentConfig map[string]string, projectId string) (utils.AssessmentOutput, error) {
3946

4047
logger.Log.Info("performing assessment")
@@ -56,15 +63,48 @@ func PerformAssessment(conv *internal.Conv, sourceProfile profiles.SourceProfile
5663
// Iterate over assessment rules and order output by confidence of each element. Merge outputs where required
5764
// Select the highest confidence output for each attribute
5865
// Populate assessment struct
66+
parallelTaskRunner := &task.RunParallelTasksImpl[assessmentTaskInput, utils.AssessmentOutput]{}
67+
68+
assessmentTasksInput := []assessmentTaskInput{
69+
{
70+
taskName: "schemaAssessment",
71+
taskFunc: func(ctx context.Context, c assessmentCollectors) (utils.AssessmentOutput, error) {
72+
result, err := performSchemaAssessment(ctx, c)
73+
return utils.AssessmentOutput{SchemaAssessment: result}, err
74+
},
75+
},
76+
{
77+
taskName: "appAssessment",
78+
taskFunc: func(ctx context.Context, c assessmentCollectors) (utils.AssessmentOutput, error) {
79+
result, err := performAppAssessment(ctx, c)
80+
return utils.AssessmentOutput{AppCodeAssessment: result}, err
81+
},
82+
},
83+
}
84+
85+
assessmentResults, err := parallelTaskRunner.RunParallelTasks(assessmentTasksInput, 2, func(input assessmentTaskInput, mutex *sync.Mutex) task.TaskResult[utils.AssessmentOutput] {
86+
result, err := input.taskFunc(ctx, c)
87+
if err != nil {
88+
logger.Log.Error(fmt.Sprintf("could not complete %s: ", input.taskName), zap.Error(err))
89+
}
90+
return task.TaskResult[utils.AssessmentOutput]{Result: result, Err: err}
91+
}, false)
5992

60-
output.SchemaAssessment, err = performSchemaAssessment(ctx, c)
6193
if err != nil {
62-
logger.Log.Info(fmt.Sprintf("could not complete schema assessment: %s", err))
94+
// Handle any error from the parallel task runner itself
6395
return output, err
6496
}
65-
output.AppCodeAssessment, err = performAppAssessment(ctx, c)
6697

67-
return output, err
98+
for _, result := range assessmentResults {
99+
if result.Result.SchemaAssessment != nil {
100+
output.SchemaAssessment = result.Result.SchemaAssessment
101+
}
102+
if result.Result.AppCodeAssessment != nil {
103+
output.AppCodeAssessment = result.Result.AppCodeAssessment
104+
}
105+
}
106+
107+
return output, nil
68108
}
69109

70110
// Initilize collectors. Take a decision here on which collectors are mandatory and which are optional
@@ -81,13 +121,10 @@ func initializeCollectors(conv *internal.Conv, sourceProfile profiles.SourceProf
81121
}
82122
c.infoSchemaCollector = &infoSchemaCollector
83123

84-
//Initiialize App Assessment Collector
85-
124+
//Initialize App Assessment Collector
86125
language, exists := assessmentConfig["language"]
87-
if !exists {
88-
// defaulting to Golang
89-
language = "go"
90-
}
126+
sourceFramework, exists := assessmentConfig["sourceFramework"]
127+
targetFramework, exists := assessmentConfig["targetFramework"]
91128

92129
codeDirectory, exists := assessmentConfig["codeDirectory"]
93130
if exists {
@@ -104,7 +141,7 @@ func initializeCollectors(conv *internal.Conv, sourceProfile profiles.SourceProf
104141
logger.Log.Debug("spannerSchema", zap.String("schema", spannerSchema))
105142

106143
summarizer, err := assessment.NewMigrationCodeSummarizer(
107-
ctx, nil, projectId, assessmentConfig["location"], mysqlSchema, spannerSchema, codeDirectory, language)
144+
ctx, nil, projectId, assessmentConfig["location"], mysqlSchema, spannerSchema, codeDirectory, language, sourceFramework, targetFramework)
108145
if err != nil {
109146
logger.Log.Error("error initiating migration summarizer")
110147
return c, err
@@ -118,8 +155,9 @@ func initializeCollectors(conv *internal.Conv, sourceProfile profiles.SourceProf
118155
return c, err
119156
}
120157

121-
func performSchemaAssessment(ctx context.Context, collectors assessmentCollectors) (utils.SchemaAssessmentOutput, error) {
122-
schemaOut := utils.SchemaAssessmentOutput{}
158+
func performSchemaAssessment(ctx context.Context, collectors assessmentCollectors) (*utils.SchemaAssessmentOutput, error) {
159+
logger.Log.Info("starting schema assessment...")
160+
schemaOut := &utils.SchemaAssessmentOutput{}
123161

124162
srcTableDefs, spTableDefs := collectors.infoSchemaCollector.ListTables()
125163
srcColDefs, spColDefs := collectors.infoSchemaCollector.ListColumnDefinitions()
@@ -180,6 +218,7 @@ func performSchemaAssessment(ctx context.Context, collectors assessmentCollector
180218
schemaOut.ViewAssessmentOutput = collectors.infoSchemaCollector.ListViews()
181219
schemaOut.SpSequences = collectors.infoSchemaCollector.ListSpannerSequences()
182220

221+
logger.Log.Info("schema assessment completed successfully.")
183222
return schemaOut, nil
184223
}
185224

@@ -190,7 +229,7 @@ func performAppAssessment(ctx context.Context, collectors assessmentCollectors)
190229
return nil, nil
191230
}
192231

193-
logger.Log.Info("adding app assessment details")
232+
logger.Log.Info("starting app assessment...")
194233
codeAssessment, err := collectors.appAssessmentCollector.AnalyzeProject(ctx)
195234

196235
if err != nil {
@@ -200,6 +239,7 @@ func performAppAssessment(ctx context.Context, collectors assessmentCollectors)
200239

201240
logger.Log.Debug("snippets: ", zap.Any("codeAssessment.Snippets", codeAssessment.Snippets))
202241

242+
logger.Log.Info("app assessment completed successfully.")
203243
return &utils.AppCodeAssessmentOutput{
204244
Language: codeAssessment.Language,
205245
Framework: codeAssessment.Framework,

0 commit comments

Comments
 (0)