赐我白日梦资料

本文主要介绍赐我白日梦资料 方法和在新技术下所面对的“挑战”,方便大家深入理解赐我白日梦资料 过程。本文也将分享赐我白日梦资料 所遇到的问题和应对策略,怎么解决怎么做的问题。
通过深入本文可以理解代码原理,进行代码文档的下载,也可以查看相应 Demo 部署效果。

目录
  • 一、什么是DDL?
  • 二、表级锁和元数据锁
    • 2.1、什么是表锁?
    • 2.2、什么是MDL?
  • 三、什么是无损DDL?
  • 四、DDL重建表
    • Mysql5.5之前重建表
    • Mysql5.6之后重建表
  • 五、ghost工具源码梳理
    • 5.1、工作模式
      • 5.1.1 主从模式a
      • 5.1.2 主主模式b
      • 5.1.3 migrate/test on relica
    • 5.1、前置性检查
    • 5.2、创建streamer监听binlog
    • 5.3、创建xxx_ghc表,xxx_gho表。
    • 5.4 开始迁移数据
    • 5.5 cut-over
    • 5.6、如何保证数据一致性
      • 5.6.1、两种情况
      • 5.6.2、对于insert
      • 5.6.3 、对于update
      • 5.6.4、对于delete
  • 九、ghost的暂停、继续、限流、终止
  • 十、写有中文注释的ghost源码项目

一、什么是DDL?

DDL全称:Data Definition Language

它包含三个主要的关键字:create、drop、alter

操作 statement
创建数据库 create database
删除数据库 drop database
修改数据库 alter database
创建表 create table
删除表 drop table
修改表 alter table
创建索引 create index
删除索引 drop index

二、表级锁和元数据锁

MySQL的表级锁有两种,一种是表锁,一种是元数据锁MDL

2.1、什么是表锁?

表锁的语法: lock tables … read/write

释放时机:通过unlock tables 主动释放,当客户端断开时也会自动释放。

例如:线程A执行: lock tables t1 read, t2 write; 那其他线程写t1和读写t2时都会被阻塞, 而且线程A在unlock tables 之前,也只能执行读t1,读写t2,它自己也不被允许写t1。

赐我白日梦

2.2、什么是MDL?

元数据锁也是一种表级锁:metadata lock。

作用:

我们不需要显示的使用它,当访问一个表的时候,它会被自动加上。MDL锁的作用就是保证读写的正确性

说白了,就是实现:当有用户对表执行DML相关操作时,其他线程不能把表结构给改了(想改表结构也可以,等排在它前面的DML全部执行完)。反之,当有线程在更改表结构时,其他线程需要执行的DML也会被阻塞住。

特性:

1、系统默认添加。

2、读锁之间不互斥。

3、读写锁之间互斥。

三、什么是无损DDL?

需求:

一般对公司对业务线来说,总是难免遇到需要修改线上表结构的需求。比如搞个活动,结果发现:现有的表中的列不够用了,那么就需要对现有的表进行无损DDL操作,添加一列。

有损DDL

为什么直接执行alter table add column有损

如下图所示:你alter table时是需要获取元数据锁的写锁的,而所有的DML操作又会被默认的加上元数据读锁。如果所有的语句都是DML语句那皆大欢喜,大家都是读锁彼此不影响。

赐我白日梦

但是你看上图这突然整出来一个alter语句,一旦等他持有写锁后,去执行DDL语句时期间,所有的DML语句全部被阻塞,我们称这中情况对业务来说是有损的。

无损DDL

所谓的无损是相对于业务来说的,如果能做到执行DDL的过程中,对业务无影响,那我们称这种ddl是无损的。

至于如何无损的解决这个问题,接着看下文。

四、DDL重建表

什么是重建表?为什么要重建表?

当我执行delete语句删除表A中的数据时,对应Innodb来说其实只是在标记删除,而是不实实在在的将表空间中的数据删除,对应innodb来将被标记删除的位置是可以可重复使用。

那么delete语句多了,表空间上的空洞就多了,磁盘的占用量也只增不减。这时我们就得重建表。缩小表A上的空洞

重建表的方法:

方式1、可以新建一个新的表,然后将原表中的数据按照id生序一次拷贝过去。

方式2、也可以执行alter table A engine=InnoDB 来重建表。

这里的alter table 其实就是DDL语句

Mysql5.5之前重建表

在5.5之前,mysql执行alter table A engine=InnoDB的流程如下图:

赐我白日梦

在上面的过程中,MySQL会自动的为我们创建临时表,拷贝数据,交换表名,以及删除旧表。

特点:

一、这个过程并不是安全的。因为在往tmp表中写数据的过程中,如果有业务流量写入表A,而且写入的位置是不久前完成往tmp中拷贝的位置,就会导致数据的丢失。

二、即使是MySQL会我们自动的创建临时表,数据拷贝的过程依然是在MySQL-Server层面做的。

Mysql5.6之后重建表

重建表的过程如下图:

赐我白日梦

1、创建一个tmp_file, 扫描表A主键的所有数据页。

2、使用数据页中的记录生成B+树,存储进tmp_file中。

这一步是对针对数据文件的操作。由innodb直接完成。

3、在生成转存B+数的过程中,将针对A的写操作记录在row_log日志中。

4、完成了B+树的转存后,将row_log中记录的日志在tmp_file中回放。

5、使用临时文件替换A中的数据文件。

可以看到,这个过程其实已经实现无损了。因为在做数据迁移的过程中,允许对原表进行CRUD

局限性:

这种DDL本质上是在替换表空间中的数据文件,仅仅是用于对原表进行无损DDL瘦身。而不是解决我们开题所说的动态无损加列的情况。

五、ghost工具源码梳理

5.1、工作模式

赐我白日梦

5.1.1 主从模式a

ghost会连接上主库

从主库中读取数据rowCopy主库上的影子表中。

添加对从库binlog的监听。将binlog-event转换成sql应用在主库上的影子表上。

因为ghost回去解析binlog,所以要求从库的binlog格式必须上row格式。不对主库的binlog格式有要求。

cutOver在主库上完成。

5.1.2 主主模式b

ghost会连接上主库

从主库中读取数据rowCopy主库上的影子表中。

添加对主库binlog的监听。将binlog-event转换成sql应用在主库上的影子表上。

要求主库的binlog格式为row格式。

cutOver在主库上完成。

5.1.3 migrate/test on relica

一般这中情况是在做预检查时完成才使用到的,ghost的任何操作都在从库上完成,主要是验证整个流程是否可以跑通,相关参数:-test-on-replica

上面所说的主主、主从、并不是MySQL实例的主从关系。说的是 rowCopy和binlog的同步是在谁身上进行。

5.1、前置性检查

这一步主要是去检查从库的基础信息。比如执行table row count、schema validation、hearbeat 但是当我们有提供 –allow-on-master时,inspector指的时主库。

  • 校验alter语句,允许重命名列名,但是不允许重命名表名
validateStatement() 
  • 测试DB的连通性: 实现的思路是使用根据用户输入的数据库连接信息获取到和主/从库的连接,然后使用db.QueryRow(sqlStr)执行指定的SQL,观察获取到的结果是否符合预期。
versionQuery := `select @@global.version` err := db.QueryRow(versionQuery).Scan(&version);   extraPortQuery := `select @@global.extra_port` db.QueryRow(extraPortQuery).Scan(&extraPort);  portQuery := `select @@global.port` db.QueryRow(portQuery).Scan(&port);  
  • 权限校验,确保用户给定sql对相应的库表有足够的操作权限:思路获取db连接,执行如下的sql;将可能的情况枚举出来,和sql返回的语句比对
`show /* gh-ost */ grants for current_user()`  在控制台执行sql的shili  mysql> show  grants for current_user(); +-------------------------------------------------------------+ | Grants for root@%                                           | +-------------------------------------------------------------+ | GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION | +-------------------------------------------------------------+ 1 row in set (0.01 sec)  	err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { 		for _, grantData := range rowMap { 			grant := grantData.String 			if strings.Contains(grant, `GRANT ALL PRIVILEGES ON *.*`) { 				foundAll = true 			} 			if strings.Contains(grant, `SUPER`) && strings.Contains(grant, ` ON *.*`) { 				foundSuper = true 			}       ...                                   if foundAll { 		log.Infof("User has ALL privileges") 		return nil 	}                              
  • 校验binlog的格式

实现的思路同样是执行下面的sql,查看bin-log是否是row格式。以及binlog_row_image是否是FULL格式。

从库强制要求:binlog为 ROW模式,还要开启log_slave_updates(告诉从服务器将其SQL线程执行的更新记入到从服务器自己的binlog中)。

为什么会这么要求binlog为row格式?

rowlevel的日志内容会非常清楚的记录下每一行数据修改的细节,而ghost有专门的go协程专门负责解析binlog同步增量数据。

相关参数:

–switch-to-rbr 作用:让gh-ost自动将从库的binlog_format转换为ROW格式。(ghost不会将格式还原回之前的状态)

–assume-rbr 作用: 如果你确定从库的bin-log格式就是row格式。可以使用这个参数,他可以保证禁止从库上运行stop slave,start slave

query := `select @@global.log_bin, @@global.binlog_format` this.db.QueryRow(query).Scan(&hasBinaryLogs, &this.migrationContext.OriginalBinlogFormat);   query = `select @@global.binlog_row_image`	 this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage);  #正常从控制台执行命令得到的结果如下: mysql> select @@global.log_bin, @@global.binlog_format; +------------------+------------------------+ | @@global.log_bin | @@global.binlog_format | +------------------+------------------------+ |                1 | ROW                    | +------------------+------------------------+ 1 row in set (0.01 sec)  mysql> select @@global.binlog_row_image; +---------------------------+ | @@global.binlog_row_image | +---------------------------+ | FULL                      | +---------------------------+ 1 row in set (0.00 sec) 

5.2、创建streamer监听binlog

这一步同样是在 migrator.go的Migrate()this.initiateInspector(); 函数中。

首先创建一个 eventsStreamer,因为他要求同步big-log,所以为他初始化一个DB连接。 if err := this.eventsStreamer.InitDBConnections(); err != nil { 		return err } // 细节: // 获取连接数据库的url: this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) // root:root@tcp(127.0.0.1:3307)/lossless_ddl_test?interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1&tls=false // 其中参数:interpolateParams=true用于防止sql注入攻击 // 其中参数:autocommit=true 表示每一条sql都当做一个事物自动提交,一般推荐这样做,如果不自动提交的话很容易出现长事物,系统也会因为这个长事物维护很大的readView占用存储空间。还可能长时间占用锁资源不释放,增加死锁的几率。  // 获取DB实例 mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri);  // 校验连接的可用性 base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext); 	-- 思路还是和5.1小节一致,使用db.QueryRow(versionQuery).Scan(&version); 执行sql,观察结果 	-- `select @@global.version` 	-- `select @@global.extra_port`   -- `select @@global.port`  //获取当前binlog的位点信息 //在ghost启动的时候会先获取mysql集群的bin-log状态,因为ghost的设计哲学是,现有的数据从原表select出来灌进影子表,在同步的过程中增量的数据通过解析重放binlog来实现,那获取集群中当前的bin-log信息自然也是master中读取:如下: query := `show /* gh-ost readCurrentBinlogCoordinates */ master status` foundMasterStatus := false err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {  //正常从console中执行命令获取到的结果如下:   mysql> show  master status; +------------------+----------+--------------+------------------+---------------------------------------------+ | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                           | +------------------+----------+--------------+------------------+---------------------------------------------+ | MySQL-bin.000007 |      194 |              |                  | a89eec96-b882-11ea-ade2-0242ac110002:1-8844 | +------------------+----------+--------------+------------------+---------------------------------------------+   //获取到当前binlog位点后,ghost会将自己伪装成一个replica连接上master,同步master的binlog //具体的实现依赖第三方类库:	"github.com/siddontang/go-mysql/replication" //调用 *replication.BinlogSyncer 的如下方法同步bin-log func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {      // 这是你可以尝试往主库写几条语句,然后flush log,ghost是能感知到的 [info] binlogsyncer.go:723 rotate to (MySQL-bin.000008, 4) INFO rotate to next log from MySQL-bin.000008:6074 to MySQL-bin.000008 [info] binlogsyncer.go:723 rotate to (MySQL-bin.000008, 4) INFO rotate to next log from MySQL-bin.000008:0 to MySQL-bin.000008  

5.3、创建xxx_ghc表,xxx_gho表。

// 创建applier NewApplier(this.migrationContext)  // 初始化DB连接(和上面说过的步骤雷同) this.applier.InitDBConnections();  // 根据配置判断是否删除ghost表。相关的配置参数:--initially-drop-old-table // 所谓的ghost就是xxx_ghc表,xxx_gho表,一个是日志表,一个是影子表,他们是ghost创建的表 // 其中前者中存放ghost打印的日志,后者是未来替换现以后表的影子表。ghost在这里判断,如果mysql实例中已经存在这两个表,是不允许进行剩下的任务的,但是可以使用--initially-drop-old-table参数配置,ghost在启动的过程中碰到这个表就把他删除。(ghost任务他们都是残留表) if this.migrationContext.InitiallyDropGhostTable { 		if err := this.DropGhostTable(); err != nil { 			return err 		} 	} // 删除表的语句如下: drop /* gh-ost */ table if exists `lossless_ddl_test`.`_user_gho` drop /* gh-ost */ table if exists `lossless_ddl_test`.`_user_del`  // 创建日志表,建表语句如下 create /* gh-ost */ table `lossless_ddl_test`.`_user_ghc` ( 			id bigint auto_increment, 			last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, 			hint varchar(64) charset ascii not null, 			value varchar(4096) charset ascii not null, 			primary key(id), 			unique key hint_uidx(hint) 		) auto_increment=256 		  // 创建影子表,一开始创建的影子表其实就是原表。 create /* gh-ost */ table `lossless_ddl_test`.`_user_gho` like `lossless_ddl_test`.`user`  // 对影子表执行alter语句,alter语句不在原表上执行也就不会出现所谓的表级锁或者MDL写锁去阻塞业务方的sql alter /* gh-ost */ table `lossless_ddl_test`.`_user_gho` add column newC6 varchar(24);  // 写心跳记录 			insert /* gh-ost */ into `lossless_ddl_test`.`_user_ghc` 				(id, hint, value) 			values 				(NULLIF(?, 0), ?, ?) 			on duplicate key update 				last_update=NOW(), 				value=VALUES(value) 	 			insert /* gh-ost */ into `lossless_ddl_test`.`_user_ghc` 				(id, hint, value) 			values 				(NULLIF(?, 0), ?, ?) 			on duplicate key update 				last_update=NOW(), 				value=VALUES(value) 			 // 这时可以去数据库中查看ghost mysql> select * from _user_ghc; +-----+---------------------+------------------------------+--------------------+ | id  | last_update         | hint                         | value              | +-----+---------------------+------------------------------+--------------------+ |   2 | 2020-07-25 19:55:11 | state                        | GhostTableMigrated | | 256 | 2020-07-25 19:55:39 | state at 1595678112426190000 | GhostTableMigrated | +-----+---------------------+------------------------------+--------------------+   // 接着获取slave的状态,主要是获到slave落后于master到秒数 // 获取到方法如下:通过show slave status查看从库的主从同步状态,其中的Slave_IO_Running和Slave_SQL_Running为YES说明主从同步正常工作,Seconds_Behind_Master为当前的从库中的数据落后于主库的秒数 	err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`,              func(m sqlutils.RowMap) error { 								slaveIORunning := m.GetString("Slave_IO_Running") 								slaveSQLRunning := m.GetString("Slave_SQL_Running") 								secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")  

5.4 开始迁移数据

在开始migration数据之前做一些检查工作

// 校验master和slave表结构是否相同,具体的实现会分别获取到他们的列的name然后比较 table structure is not identical between master and replica  // 获取到原表和ghost表唯一键的交集 sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys)   // 校验唯一键。 for i, sharedUniqueKey := range sharedUniqueKeys { 		this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &sharedUniqueKey.Columns) 		uniqueKeyIsValid := true     // 校验唯一键的类型,ghost不支持FloatColumnType和Json列。 		for _, column := range sharedUniqueKey.Columns.Columns() { 			switch column.Type { 			case sql.FloatColumnType: 				{ 					log.Warning("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name) 					uniqueKeyIsValid = false 				} 			case sql.JSONColumnType: 				{ 					// Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code 					// will remain in place to potentially handle the future case where JSON is supported in indexes. 					log.Warning("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name) 					uniqueKeyIsValid = false 				} 			} 		}  // 检验选出的唯一键,如果没有选出唯一键的话报错求助。 if this.migrationContext.UniqueKey == nil { 		return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") }     // 检查唯一键是否可以是空的。默认是不允许唯一键为空的,如果没办法改变让他不为空,ghost也提供了参数去适配   if this.migrationContext.UniqueKey.HasNullable { 		if this.migrationContext.NullableUniqueKeyAllowed { 			log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) 		} else { 			return fmt.Errorf("Chosen key (%s) has nullable columns. Bailing out. To force this operation to continue, supply --allow-nullable-unique-key flag. Only do so if you are certain there are no actual NULL values in this key. As long as there aren't, migration should be fine. NULL values in columns of this key will corrupt migration's data", this.migrationContext.UniqueKey) 		} 	}  // 获取原表和影子表的交集列 this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap) //打印日志 INFO Shared columns are id,status,newC1,newC2,newC3,newC5,newC4  //做其他额外的检查 

做完了上面的检查工作就可以真正的迁移数据了。

数据迁移部分的主要逻辑在如下函数中

  // 统计当前有多少行   // 具体实现:新开协程,用于统计一共多少行,执行如下如下sql   // select /* gh-ost */ count(*) as rows from %s.%s`   if err := this.countTableRows(); err != nil { 		return err 	} 	 	// 添加DML语句监听器   // addDMLEventsListener开始监听原始表上的binlog事件,并为每个此类事件创建一个写任务并将其排队。    // 由executeWriteFuncs专门负责消费这个队列。 	if err := this.addDMLEventsListener(); err != nil { 		return err 	}    // 获取迁移的范围   //  -- 执行sql:获取最小的id值   //    select /* gh-ost `lossless_ddl_test`.`user` */ `id` 	//			from 	//				`lossless_ddl_test`.`user` 	//  		order by 	//				`id` asc 	//			limit 1 				 	//   -- 执行sql:获取最大id值   //   	select /* gh-ost `lossless_ddl_test`.`user` */ `id` 	//				from 	//					`lossless_ddl_test`.`user` 	//  			order by 	//					`id` desc 	//				limit 1 	if err := this.applier.ReadMigrationRangeValues(); err != nil { 		return err 	}    // 这两个协程分别是迁移任务的执行者。   // 和迁移任务的创建者。 	go this.executeWriteFuncs() 	go this.iterateChunks() 

下图是:上面代码中最后两个协程之间是如何配合工作的逻辑图

![未命名文件 (1)](/Users/dxm/Downloads/未命名文件 (1).png)

通过上图可以看出其中的executeWrite的主要作用其实是执行任务。

那他要执行的任务有主要有两种:

  • 数据迁移任务:copyRowsFunc()
  • 同步binlog事件的函数:ApplyDMLEventQueries()

其中copyRowsFunc()如下:

	copyRowsFunc := func() error { 			if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { 				// Done. There's another such check down the line 				return nil 			}  			// 当hasFurterRange为false时,原始表可能被写锁定,       // CalculateNextIterationRangeEndValues将永远挂起 			hasFurtherRange := false 			if err := this.retryOperation(func() (e error) { 				hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues() 				return e 			}); err != nil { 				return terminateRowIteration(err) 			} 			if !hasFurtherRange { 				atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) 				return terminateRowIteration(nil) 			} 			// Copy task: 			applyCopyRowsFunc := func() error { 				if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { 					return nil 				} 				_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() 				if err != nil { 					return err // wrapping call will retry 				} 				atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) 				atomic.AddInt64(&this.migrationContext.Iteration, 1) 				return nil 			} 			if err := this.retryOperation(applyCopyRowsFunc); err != nil { 				return terminateRowIteration(err) 			} 			return nil 		} 

如上函数有个需要
赐我白日梦资料部分资料来自网络,侵权毕设源码联系删除

区块链毕设网(www.qklbishe.com)全网最靠谱的原创区块链毕设代做网站
部分资料来自网络,侵权联系删除!
资源收费仅为搬运整理打赏费用,用户自愿支付 !
qklbishe.com区块链毕设代做网专注|以太坊fabric-计算机|java|毕业设计|代做平台 » 赐我白日梦资料

提供最优质的资源集合

立即查看 了解详情