Skip to content

Commit 1ba8365

Browse files
Orion7rpupillordRogueJinAmoebaProtozoaleeraya
authored
executor、server、planner: Increase the support of the returning keyword (#83)
* Add returning keyword plan and executor for supporting PG compliant DML (#69) * Support return result set after deletion Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * add returning plan and exectuor modify adapter.go to execute returning separately Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> Co-authored-by: RogueJin <21214103+RogueJin@users.noreply.github.com> * upload delete returning.png * upgrade DCparser and fix lint issues (#70) Signed-off-by: RogueJin <21214103+RogueJin@users.noreply.github.com> * Hacking demo (#71) * Readme: Updated readme and modified the version of TiDB cluster deployed (#50) * privileges: fixed unit test of user authentication Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * fixed error msg and added const Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * executor: Modify encrypted ciphertext in ut Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * planner: Implement some interfaces in the test to solve the problem of build failed Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * planner: Implement some interfaces in the test to solve the problem of build failed Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * update readme * Go Mod, Makefile: Dependency update (#51) * added coverage report to gitignore Signed-off-by: David <davidyangad@gmail.com> * added dev-tmp to skip explaintest Signed-off-by: David <davidyangad@gmail.com> * update DCParser version to v1.3-alpha Signed-off-by: David <davidyangad@gmail.com> * changed dependency requirement for HdrHistogram Signed-off-by: David <davidyangad@gmail.com> * downgraded DCParser version to 1.25-alpha Signed-off-by: David <davidyangad@gmail.com> * Explaintest, Server: Changed Explaintest's driver (#52) * changed driver for explaintest Signed-off-by: David <davidyangad@gmail.com> * improves status handling for server during transaction Signed-off-by: David <davidyangad@gmail.com> * Executor: Added support for setting isolation level in begin (#57) * update DCParser reference Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * added setting isolation level capability in begin Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * adding documents for BEGIN statement Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * readme: update url for build status (#56) Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * executor: uncommented some previously skipped test (#54) * uncommented some previously skipped test Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * wait longer for update version Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * further enlarge test wait time for better stability Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * variable,session: add search_path system variable. (#58) * Fixed unit test bugs Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * style: fmt check Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * style: fmt check , delete blank lines Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * style: fmt check Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * Add postgres search_path system variable Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * Modifying Comment Information Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * Added extra_float_digit system variable Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * fix:Delete unnecessary comments,Change the DCParser version Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * formatting: fixed formatting issues (#59) * fixing formatting issues Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * typo fix Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fixed a bug that params description will be sent when no params in handleStmtDescription (#61) Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * infoschema , util, autoid: Compatible with some tables in pg_catalog (#63) * Compatible with some tables in pg_catalog Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * add the correct license information and fix unit test Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * expression:PostgreSQL system function support (#64) * Added the logic realization of some system functions. Added unit tests for PostgreSQL system functions. Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * update DCParser version Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * fixed built in function unit tests (#66) Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * core, executor: add the implemet of setparamtype at show, showddl, datasource, limit plan. (#67) * handle the implement of setparamtype at show, showddl or datasource plan Signed-off-by: studiolee <1964773741@qq.com> * handle the implement of setparamtype at limit plan Signed-off-by: studiolee <1964773741@qq.com> * add ut for setparamtype Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> Co-authored-by: studiolee <1964773741@qq.com> * upgrade DCparser and fix lint issues Signed-off-by: RogueJin <21214103+RogueJin@users.noreply.github.com> Co-authored-by: pupillord <51103574+pupillord@users.noreply.github.com> Co-authored-by: AmoebaProtozoa <davidyangad@gmail.com> Co-authored-by: Cue Ray <50295175+Orion7r@users.noreply.github.com> Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: studiolee <1964773741@qq.com> * merge main into hacking-demo (#79) * Readme: Updated readme and modified the version of TiDB cluster deployed (#50) * privileges: fixed unit test of user authentication Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * fixed error msg and added const Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * executor: Modify encrypted ciphertext in ut Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * planner: Implement some interfaces in the test to solve the problem of build failed Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * planner: Implement some interfaces in the test to solve the problem of build failed Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * update readme * Go Mod, Makefile: Dependency update (#51) * added coverage report to gitignore Signed-off-by: David <davidyangad@gmail.com> * added dev-tmp to skip explaintest Signed-off-by: David <davidyangad@gmail.com> * update DCParser version to v1.3-alpha Signed-off-by: David <davidyangad@gmail.com> * changed dependency requirement for HdrHistogram Signed-off-by: David <davidyangad@gmail.com> * downgraded DCParser version to 1.25-alpha Signed-off-by: David <davidyangad@gmail.com> * Explaintest, Server: Changed Explaintest's driver (#52) * changed driver for explaintest Signed-off-by: David <davidyangad@gmail.com> * improves status handling for server during transaction Signed-off-by: David <davidyangad@gmail.com> * Executor: Added support for setting isolation level in begin (#57) * update DCParser reference Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * added setting isolation level capability in begin Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * adding documents for BEGIN statement Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * readme: update url for build status (#56) Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * executor: uncommented some previously skipped test (#54) * uncommented some previously skipped test Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * wait longer for update version Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * further enlarge test wait time for better stability Signed-off-by: AmoebaProtozoa <davidyangad@gmail.com> * variable,session: add search_path system variable. (#58) * Fixed unit test bugs Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * style: fmt check Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * style: fmt check , delete blank lines Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * style: fmt check Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * Add postgres search_path system variable Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * Modifying Comment Information Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * Added extra_float_digit system variable Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * fix:Delete unnecessary comments,Change the DCParser version Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * formatting: fixed formatting issues (#59) * fixing formatting issues Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * typo fix Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fixed a bug that params description will be sent when no params in handleStmtDescription (#61) Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> * infoschema , util, autoid: Compatible with some tables in pg_catalog (#63) * Compatible with some tables in pg_catalog Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * add the correct license information and fix unit test Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * expression:PostgreSQL system function support (#64) * Added the logic realization of some system functions. Added unit tests for PostgreSQL system functions. Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * update DCParser version Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * fixed built in function unit tests (#66) Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * core, executor: add the implemet of setparamtype at show, showddl, datasource, limit plan. (#67) * handle the implement of setparamtype at show, showddl or datasource plan Signed-off-by: studiolee <1964773741@qq.com> * handle the implement of setparamtype at limit plan Signed-off-by: studiolee <1964773741@qq.com> * add ut for setparamtype Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> Co-authored-by: studiolee <1964773741@qq.com> * Executor: Sort param (#72) * added test suites for sortParamOrder Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * removed magic Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * redo ParamMakerSorter Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * use 0 index Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * added ut case and fixed compatibility with MySQL Signed-off-by: jk <51103574+pupillord@users.noreply.github.com> Co-authored-by: jk <51103574+pupillord@users.noreply.github.com> * Conn: Improved extended query handling (#73) * removed unnecessary write back to client Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * fixed extended query describe Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * Server: Improved Extended Query Protocol (#74) * binary long input Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * binary float support Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * new field to save pg oid Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * Bind binary (#75) * binary long input Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * binary float support Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * new field to save pg oid Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * passed BenchMarkSQL Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * Server: Restructure Parse OID handling (#77) * handle OID and formatCode Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * add pgOID conversion to parse handler Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * handle OID of 0 Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * restructured parse oid handling Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> * removed unnecessary OID structure in bind Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: pupillord <51103574+pupillord@users.noreply.github.com> Co-authored-by: Cue Ray <50295175+Orion7r@users.noreply.github.com> Co-authored-by: studiolee <1964773741@qq.com> * add DML statement returning compatibility * add insert returning support Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * update go mod Signed-off-by: Orion7r <50295175+Orion7r@users.noreply.github.com> * update go mod Signed-off-by: OrionRay <50295175+Orion7r@users.noreply.github.com> Co-authored-by: pupillord <51103574+pupillord@users.noreply.github.com> Co-authored-by: RogueJin <21214103+RogueJin@users.noreply.github.com> Co-authored-by: AmoebaProtozoa <davidyangad@gmail.com> Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: studiolee <1964773741@qq.com>
1 parent 8dadaf3 commit 1ba8365

25 files changed

Lines changed: 723 additions & 24 deletions

docs/delete returning.png

394 KB
Loading

executor/adapter.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ type recordSet struct {
7979
stmt *ExecStmt
8080
lastErr error
8181
txnStartTS uint64
82+
rows []chunk.Row
83+
}
84+
85+
func (a *recordSet) Rows() []chunk.Row {
86+
return a.rows
8287
}
8388

8489
func (a *recordSet) Fields() []*ast.ResultField {
@@ -364,6 +369,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
364369
}
365370

366371
if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled {
372+
//if returningRS != nil {
373+
// return returningRS, err
374+
//}
367375
return result, err
368376
}
369377

@@ -439,6 +447,10 @@ type chunkRowRecordSet struct {
439447
execStmt *ExecStmt
440448
}
441449

450+
func (c *chunkRowRecordSet) Rows() []chunk.Row {
451+
return c.rows
452+
}
453+
442454
func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
443455
return c.fields
444456
}
@@ -507,7 +519,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
507519
defer span1.Finish()
508520
ctx = opentracing.ContextWithSpan(ctx, span1)
509521
}
510-
522+
var returningRS *recordSet
511523
// Check if "tidb_snapshot" is set for the write executors.
512524
// In history read mode, we can not do write operations.
513525
switch e.(type) {
@@ -532,9 +544,57 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
532544
if err != nil {
533545
return nil, err
534546
}
547+
548+
// Do returning
549+
returningRS = getReturningRecordSet(ctx, e, a)
550+
551+
if returningRS != nil {
552+
return returningRS, err
553+
}
535554
return nil, err
536555
}
537556

557+
func getReturningRecordSet(ctx context.Context, e Executor, a *ExecStmt) *recordSet {
558+
var rs *recordSet
559+
if del, ok := e.(*DeleteExec); ok && del.returning != nil {
560+
err := del.returning.Next(ctx, del.AffectedRows())
561+
if err != nil {
562+
return rs
563+
}
564+
565+
if ret, ok := del.returning.(*ReturningExec); ok {
566+
rs = ret.ResultSet
567+
rs.stmt = a
568+
}
569+
}
570+
571+
if updt, ok := e.(*UpdateExec); ok && updt.returning != nil {
572+
err := updt.returning.Next(ctx, updt.AffectedRows())
573+
if err != nil {
574+
return rs
575+
}
576+
577+
if ret, ok := updt.returning.(*ReturningExec); ok {
578+
rs = ret.ResultSet
579+
rs.stmt = a
580+
}
581+
}
582+
583+
if insert, ok := e.(*InsertExec); ok && insert.returning != nil {
584+
err := insert.returning.Next(ctx, insert.evalBuffer4Return.ToRow().Chunk())
585+
if err != nil {
586+
return rs
587+
}
588+
589+
if ret, ok := insert.returning.(*ReturningExec); ok {
590+
rs = ret.ResultSet
591+
rs.stmt = a
592+
rs.stmt.OutputNames = ret.ReturningFields
593+
}
594+
}
595+
return rs
596+
}
597+
538598
func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
539599
sctx := a.Ctx
540600
// Do not active the transaction here.

executor/builder.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
230230
return b.buildAdminShowTelemetry(v)
231231
case *plannercore.AdminResetTelemetryID:
232232
return b.buildAdminResetTelemetryID(v)
233+
case *plannercore.PhysicalReturning:
234+
return b.buildReturning(v)
233235
default:
234236
if mp, ok := p.(MockPhysicalPlan); ok {
235237
return mp.GetExecutor()
@@ -723,6 +725,12 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
723725
InsertValues: ivs,
724726
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
725727
}
728+
729+
if v.ReturningPlan != nil {
730+
insert.returning = b.build(v.ReturningPlan)
731+
insert.returning.(*ReturningExec).ReturningFields = v.OutputNames()
732+
}
733+
726734
return insert
727735
}
728736

@@ -1708,6 +1716,11 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
17081716
tblID2table: tblID2table,
17091717
tblColPosInfos: v.TblColPosInfos,
17101718
}
1719+
1720+
if v.ReturningPlan != nil {
1721+
updateExec.returning = b.build(v.ReturningPlan)
1722+
}
1723+
17111724
return updateExec
17121725
}
17131726

@@ -1721,6 +1734,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
17211734
}
17221735
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
17231736
selExec := b.build(v.SelectPlan)
1737+
17241738
if b.err != nil {
17251739
return nil
17261740
}
@@ -1732,6 +1746,11 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
17321746
IsMultiTable: v.IsMultiTable,
17331747
tblColPosInfos: v.TblColPosInfos,
17341748
}
1749+
1750+
if v.ReturningPlan != nil {
1751+
deleteExec.returning = b.build(v.ReturningPlan)
1752+
}
1753+
17351754
return deleteExec
17361755
}
17371756

@@ -3144,3 +3163,18 @@ func (b *executorBuilder) buildAdminShowTelemetry(v *plannercore.AdminShowTeleme
31443163
func (b *executorBuilder) buildAdminResetTelemetryID(v *plannercore.AdminResetTelemetryID) Executor {
31453164
return &AdminResetTelemetryIDExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID())}
31463165
}
3166+
3167+
func (b *executorBuilder) buildReturning(v *plannercore.PhysicalReturning) Executor {
3168+
childExec := b.build(v.Children()[0])
3169+
if b.err != nil {
3170+
return nil
3171+
}
3172+
3173+
returningExec := ReturningExec{
3174+
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
3175+
schema: v.Schema(),
3176+
}
3177+
executorCounterSortExec.Inc()
3178+
3179+
return &returningExec
3180+
}

executor/delete.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ type DeleteExec struct {
3737
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
3838
tblColPosInfos plannercore.TblColPosInfoSlice
3939
memTracker *memory.Tracker
40+
affectedRow *chunk.Chunk
41+
42+
returning Executor
4043
}
4144

4245
// Next implements the Executor Next interface.
@@ -112,6 +115,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
112115
}
113116
rowCount++
114117
}
118+
e.AddAffectedRows(chk)
115119
chk = chunk.Renew(chk, e.maxChunkSize)
116120
}
117121

@@ -197,10 +201,32 @@ func (e *DeleteExec) Open(ctx context.Context) error {
197201
e.memTracker = memory.NewTracker(e.id, -1)
198202
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
199203

200-
return e.children[0].Open(ctx)
204+
err := e.children[0].Open(ctx)
205+
if err != nil {
206+
return err
207+
}
208+
209+
if e.returning != nil {
210+
err = e.returning.Open(ctx)
211+
if err != nil {
212+
return err
213+
}
214+
}
215+
216+
return nil
201217
}
202218

203219
// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
204220
// the key in map[int64]Row is the joined table handle, which represent a unique reference row.
205221
// the value in map[int64]Row is the deleting row.
206222
type tableRowMapType map[int64]map[int64][]types.Datum
223+
224+
// AffectedRows get affected rows.
225+
func (e *DeleteExec) AffectedRows() *chunk.Chunk {
226+
return e.affectedRow
227+
}
228+
229+
// AddAffectedRows adds affected row.
230+
func (e *DeleteExec) AddAffectedRows(chk *chunk.Chunk) {
231+
e.affectedRow = chk
232+
}

executor/insert.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ import (
3737
// InsertExec represents an insert executor.
3838
type InsertExec struct {
3939
*InsertValues
40-
OnDuplicate []*expression.Assignment
41-
evalBuffer4Dup chunk.MutRow
42-
curInsertVals chunk.MutRow
43-
row4Update []types.Datum
40+
OnDuplicate []*expression.Assignment
41+
evalBuffer4Dup chunk.MutRow
42+
evalBuffer4Return chunk.MutRow
43+
curInsertVals chunk.MutRow
44+
row4Update []types.Datum
45+
returning Executor
4446

4547
Priority mysql.PriorityEnum
4648
}
@@ -312,6 +314,10 @@ func (e *InsertExec) Open(ctx context.Context) error {
312314
if !e.allAssignmentsAreConstant {
313315
e.initEvalBuffer()
314316
}
317+
if e.returning != nil {
318+
e.initEvalBuffer4Return()
319+
}
320+
315321
return nil
316322
}
317323

@@ -338,6 +344,27 @@ func (e *InsertExec) initEvalBuffer4Dup() {
338344
e.row4Update = make([]types.Datum, 0, len(evalBufferTypes))
339345
}
340346

347+
func (e *InsertExec) initEvalBuffer4Return() {
348+
// Use public columns for new row.
349+
numCols := len(e.Table.Cols())
350+
// Use writable columns for old row for update.
351+
numWritableCols := len(e.Table.WritableCols())
352+
353+
evalBufferTypes := make([]*types.FieldType, 0, numCols+numWritableCols)
354+
355+
// Append the old row before the new row, to be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
356+
for _, col := range e.Table.WritableCols() {
357+
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
358+
}
359+
for _, col := range e.Table.Cols() {
360+
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
361+
}
362+
if e.hasExtraHandle {
363+
evalBufferTypes = append(evalBufferTypes, types.NewFieldType(mysql.TypeLonglong))
364+
}
365+
e.evalBuffer4Return = chunk.MutRowFromTypes(evalBufferTypes[numWritableCols:])
366+
}
367+
341368
// doDupRowUpdate updates the duplicate row.
342369
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
343370
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {

executor/returning.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2021 Digital China Group Co.,Ltd
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package executor
15+
16+
import (
17+
"context"
18+
"github.com/pingcap/tidb/types"
19+
"runtime/trace"
20+
"time"
21+
22+
"github.com/pingcap/tidb/expression"
23+
"github.com/pingcap/tidb/util/chunk"
24+
"github.com/pingcap/tidb/util/execdetails"
25+
)
26+
27+
// ReturningExec represents Returning Executor
28+
type ReturningExec struct {
29+
baseExecutor
30+
31+
Idx int
32+
fetched bool
33+
schema *expression.Schema
34+
ReturningFields types.NameSlice
35+
ResultSet *recordSet
36+
}
37+
38+
// Open Returning Executor
39+
func (e *ReturningExec) Open(ctx context.Context) error {
40+
41+
return e.children[0].Open(ctx)
42+
}
43+
44+
// Next Returning Executor
45+
func (e *ReturningExec) Next(ctx context.Context, req *chunk.Chunk) error {
46+
return e.fetchRowChunks(ctx, req)
47+
}
48+
49+
// Close Returning Executor
50+
func (e *ReturningExec) Close() error {
51+
return e.children[0].Close()
52+
}
53+
54+
// Returning Executor fetchRowChunks
55+
func (e *ReturningExec) fetchRowChunks(ctx context.Context, req *chunk.Chunk) error {
56+
defer func() {
57+
e.fetched = true
58+
}()
59+
60+
var stmtDetail *execdetails.StmtExecDetails
61+
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
62+
if stmtDetailRaw != nil {
63+
stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails)
64+
}
65+
66+
rs := &recordSet{
67+
executor: e.base().children[0],
68+
}
69+
e.ResultSet = rs
70+
rs.NewChunk()
71+
if req == nil {
72+
return nil
73+
}
74+
75+
rowCount := req.NumRows()
76+
if rowCount == 0 {
77+
return nil
78+
}
79+
start := time.Now()
80+
reg := trace.StartRegion(ctx, "ProcessReturning")
81+
iter := chunk.NewIterator4Chunk(req)
82+
for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
83+
rs.rows = append(rs.rows, chunkRow)
84+
}
85+
e.ResultSet = rs
86+
if stmtDetail != nil {
87+
stmtDetail.WriteSQLRespDuration += time.Since(start)
88+
}
89+
reg.End()
90+
91+
return nil
92+
}

0 commit comments

Comments
 (0)