column:一个具有位图索引的存储库,具有高性能、列式、支持内存存储的特点,Golang实现
目录索引
column是一个具有位图索引的列式内存存储库。
此软件包包含一个高性能的列式内存存储引擎,支持零分配和位图索引的快速查询、更新和迭代。
特征
- 优化的缓存友好型列式数据布局,可最大程度地减少缓存未命中。
- 针对查询期间的零堆分配进行了优化(请参阅下面的基准)。
- 优化批量更新/删除,事务期间的更新需要。
12ns
- 支持支持 SIMD 的聚合函数,例如“总和”、“平均值”、“最小值”和“最大值”。
- 通过利用位图索引支持支持 SIMD 的过滤(即“where”子句)。
- 支持列式投影(即“选择”子句)以实现快速检索。
- 支持基于提供的谓词动态计算的计算索引。
- 支持使用分片闩锁进行并发更新,以保持快速。
- 支持事务隔离,允许您创建事务和提交/回滚。
- 支持基于生存时间或过期列的行过期。
- 支持以事务方式对任何值进行原子合并。
- 支持无法使用偏移量的用例的主键。
- 支持一致地流式传输所有提交的更改数据流。
- 支持并发快照,允许将整个集合存储到文件中。
文档
一般的想法是利用缓存友好的方法来组织数组结构(SoA)中的数据,否则在数据库设计中称为“列式”存储。这反过来又使我们能够非常有效地迭代和过滤列。最重要的是,该软件包还将位图索引添加到列式存储中,允许使用二进制、 和构建过滤器查询(请参阅 kelindar/位图与 SIMD 支持)。and
and not
or
xor
集合和列
若要将数据导入存储,需要首先创建 by 调用方法。每个集合都需要一个模式,需要通过多次调用来指定,或者通过调用函数从对象自动推断。在下面的示例中,我们创建了一个包含几列的新集合。Collection
NewCollection()
CreateColumn()
CreateColumnsOf()
// Create a new collection with some columns players := column.NewCollection() players.CreateColumn("name", column.ForString()) players.CreateColumn("class", column.ForString()) players.CreateColumn("balance", column.ForFloat64()) players.CreateColumn("age", column.ForInt16())
现在我们已经创建了一个集合,我们可以通过在集合上使用方法插入单个记录。在此示例中,我们将插入一行并手动指定值。请注意,此函数返回一个指示插入行的行索引。Insert()
index
index, err := players.Insert(func(r column.Row) error { r.SetString("name", "merlin") r.SetString("class", "mage") r.SetFloat64("balance", 99.95) r.SetInt16("age", 107) return nil })
虽然前面的示例演示了如何插入单行,但以这种方式插入多行效率相当低下。这是因为直接在集合上的每个调用都会启动一个单独的事务,并且与之相关的性能成本很小。如果要执行批量插入并更快地插入许多值,可以通过调用事务来完成,如以下示例所示。请注意,唯一的区别是通过调用方法并在事务上调用方法而不是在集合上调用方法来实例化事务。Insert()
Insert()
Query()
txn.Insert()
players.Query(func(txn *column.Txn) error { for _, v := range myRawData { txn.Insert(...) } return nil // Commit })
查询和索引
存储允许您根据是否存在某些属性或其值来查询数据。在下面的示例中,我们将查询我们的集合,并对事务应用过滤操作 bu using 方法。此方法扫描值并检查某个谓词的计算结果是否为 。在这种情况下,我们将扫描所有玩家并查找他们的 ,如果他们的职业等于“流氓”,我们将接受它。最后,我们调用仅计算结果集的方法。WithValue()
true
class
Count()
// This query performs a full scan of "class" column players.Query(func(txn *column.Txn) error { count := txn.WithValue("class", func(v interface{}) bool { return v == "rogue" }).Count() return nil })
现在,如果我们需要经常执行此查询怎么办?可以简单地创建一个具有相同谓词的索引,并在每次 (a) 将对象插入集合和 (b) 更新依赖列的值时应用此计算。让我们看看下面的例子,我们正在创建一个依赖于“类”列的索引。此索引应用相同的谓词,该谓词仅在类为“恶意”时才返回。然后,我们可以通过简单地调用方法并提供索引名称来查询它。rogue
true
With()
索引本质上类似于布尔列,因此从技术上讲,您也可以在查询时选择它的值。现在,在此示例中,查询的执行速度会更快,因为它在后台对“恶意”索引使用位图索引,并在查询时对两个位图执行简单的逻辑操作。这样可以避免在 期间对谓词进行整个扫描和应用。10-100x
AND
Query
// Create the index "rogue" in advance out.CreateIndex("rogue", "class", func(v interface{}) bool { return v == "rogue" }) // This returns the same result as the query before, but much faster players.Query(func(txn *column.Txn) error { count := txn.With("rogue").Count() return nil })
查询可以进一步扩展,因为它允许索引和操作。这允许您对集合提出更复杂的问题。在下面的示例中,假设我们在列上有一堆索引,我们想问不同的问题。intersection
difference
union
class
首先,让我们尝试通过使用名为 same 的方法应用操作来合并两个查询。在这里,我们首先只选择盗贼,然后将它们与法师合并在一起,从而选择同时包含盗贼和法师。Union()
// How many rogues and mages? players.Query(func(txn *column.Txn) error { txn.With("rogue").Union("mage").Count() return nil })
接下来,让我们计算每个不是流氓的人,为此我们可以使用一种对集合执行差异(即二进制操作)的方法。这将导致集合中除盗贼之外的所有玩家的计数。Without()
AND NOT
// How many rogues and mages? players.Query(func(txn *column.Txn) error { txn.Without("rogue").Count() return nil })
现在,您可以组合所有方法并继续构建更复杂的查询。同时查询索引和非索引字段时,请务必知道这一点,因为每次扫描将仅应用于所选内容,从而加快查询速度。因此,如果您在选择50%玩家的特定索引上有一个过滤器,然后对其进行扫描(例如),它只会扫描50%的用户,因此速度会快2倍。WithValue()
// How many rogues that are over 30 years old? players.Query(func(txn *column.Txn) error { txn.With("rogue").WithFloat("age", func(v float64) bool { return v >= 30 }).Count() return nil })
迭代结果
在前面的所有示例中,我们只执行计算结果集中元素数的操作。在本节中,我们将介绍如何迭代结果集。Count()
和以前一样,需要使用集合上的方法启动事务。之后,我们可以调用允许我们迭代事务中的结果集的方法。请注意,它可以像预期的那样紧跟在方法之后。Query()
txn.Range()
With..()
为了访问迭代的结果,在调用方法之前,我们需要首先使用诸如 、 等方法加载我们需要的列读取器。它们准备在迭代时执行高效查找所需的读/写缓冲区。Range()
txn.String()
txn.Float64()
在下面的示例中,我们从集合中选择所有流氓,并使用该方法打印出他们的名字,并使用调用方法创建的列读取器访问“name”列。Range()
txn.String("name")
players.Query(func(txn *column.Txn) error { names := txn.String("name") // Create a column reader return txn.With("rogue").Range(func(i uint32) { name, _ := names.Get() println("rogue name", name) }) })
同样,如果需要访问更多列,只需创建适当的列读取器并使用它们,如前面的示例所示。
players.Query(func(txn *column.Txn) error { names := txn.String("name") ages := txn.Int64("age") return txn.With("rogue").Range(func(i uint32) { name, _ := names.Get() age, _ := ages.Get() println("rogue name", name) println("rogue age", age) }) })
采用(数字)列读取器将考虑事务的当前过滤索引。Sum()
players.Query(func(txn *column.Txn) error { totalAge := txn.With("rouge").Int64("age").Sum() totalRouges := int64(txn.Count()) avgAge := totalAge / totalRouges txn.WithInt("age", func(v float64) bool { return v < avgAge }) // get total balance for 'all rouges younger than the average rouge' balance := txn.Float64("balance").Sum() return nil })
排序索引
除了位图索引之外,集合还支持一致排序的索引。这些索引是暂时性的,必须在集合加载快照时重新创建。
在下面的示例中,我们创建一个 SortedIndex 对象,并使用它来对事务中的过滤记录进行排序。
// Create the sorted index "sortedNames" in advance out.CreateSortIndex("richest", "balance") // This filters the transaction with the `rouge` index before // ranging through the remaining balances by ascending order players.Query(func(txn *column.Txn) error { name := txn.String("name") balance := txn.Float64("balance") txn.With("rogue").Ascend("richest", func (i uint32) { // save or do something with sorted record curName, _ := name.Get() balance.Set(newBalance(curName)) }) return nil })
更新值
为了更新集合中的某些项,您可以简单地调用方法并使用列访问器 or 方法以原子方式更新某个列的值。鉴于我们的商店支持交易,更新不会立即反映出来。仅当提交事务时,更新才会应用于集合,从而允许隔离和回滚。Range()
Set()
Add()
在下面的示例中,我们选择所有流氓,并将他们的余额和年龄更新为特定值。事务返回,因此当方法返回时将自动提交。nil
Query()
players.Query(func(txn *column.Txn) error { balance := txn.Float64("balance") age := txn.Int64("age") return txn.With("rogue").Range(func(i uint32) { balance.Set(10.0) // Update the "balance" to 10.0 age.Set(50) // Update the "age" to 50 }) })
在某些情况下,您可能希望以原子方式递增或递减数值。为此,您可以使用提供的操作。请注意,索引也将相应地更新,并使用最新值重新评估谓词。在下面的示例中,我们将所有流氓的余额原子地增加 500。Merge()
players.Query(func(txn *column.Txn) error { balance := txn.Float64("balance") return txn.With("rogue").Range(func(i uint32) { balance.Merge(500.0) // Increment the "balance" by 500 }) })
虽然数值的原子递增/递减相对简单,但可以使用选项指定此操作,也可以用于其他数据类型,例如字符串。在下面的示例中,我们将创建一个合并函数,该函数将两个字符串连接在一起,当调用时,会自动附加新字符串。Merge()
WithMerge()
MergeString()
// A merging function that simply concatenates 2 strings together concat := func(value, delta string) string { if len(value) > 0 { value += ", " } return value + delta } // Create a column with a specified merge function db := column.NewCollection() db.CreateColumn("alphabet", column.ForString(column.WithMerge(concat))) // Insert letter "A" db.Insert(func(r column.Row) error { r.SetString("alphabet", "A") // now contains "A" return nil }) // Insert letter "B" db.QueryAt(0, func(r column.Row) error { r.MergeString("alphabet", "B") // now contains "A, B" return nil })
即将过期的值
有时,当您不再需要某些行时,自动删除它们很有用。为此,库会自动为每个新集合添加一列,并按时启动清理 goroutine,该例程定期运行并清理过期的对象。为了设置这一点,您可以简单地在集合上使用允许插入具有定义生存时间持续时间的对象的方法。expire
Insert...()
在下面的示例中,我们将一个对象插入到集合中,并将生存时间设置为距当前时间的 5 秒。在此时间之后,对象将自动从集合中逐出,并且可以回收其空间。
players.Insert(func(r column.Row) error { r.SetString("name", "Merlin") r.SetString("class", "mage") r.SetTTL(5 * time.Second) // time-to-live of 5 seconds return nil })
有趣的是,由于自动添加到每个集合的列是实际的普通列,因此您可以查询甚至更新它。在下面的示例中,我们使用该方法查询并将生存时间延长 1 小时。expire
Extend()
players.Query(func(txn *column.Txn) error { ttl := txn.TTL() return txn.Range(func(i uint32) { ttl.Extend(1 * time.Hour) // Add some time }) })
事务提交和回滚
事务允许在两个并发操作之间进行隔离。事实上,所有批处理查询都必须通过此库中的事务。该方法需要一个函数,该函数接受一个指针,该指针包含支持查询的各种帮助程序方法。在下面的示例中,我们尝试遍历所有玩家,并通过将其设置为 .如果函数返回且没有任何错误,该方法将自动调用。另一方面,如果提供的函数返回错误,查询将自动调用,因此不会应用任何更改。Query
column.Txn
10.0
Query
txn.Commit()
txn.Rollback()
// Range over all of the players and update (successfully their balance) players.Query(func(txn *column.Txn) error { balance := txn.Float64("balance") txn.Range(func(i uint32) { v.Set(10.0) // Update the "balance" to 10.0 }) // No error, transaction will be committed return nil })
现在,在此示例中,我们尝试更新余额,但查询回调返回错误,在这种情况下,基础集合中实际上不会反映任何更新。
// Range over all of the players and update (successfully their balance) players.Query(func(txn *column.Txn) error { balance := txn.Float64("balance") txn.Range(func(i uint32) { v.Set(10.0) // Update the "balance" to 10.0 }) // Returns an error, transaction will be rolled back return fmt.Errorf("bug") })
使用主键
在某些情况下,通过主键而不是由集合内部生成的索引来访问特定行很有用。对于此类用例,该库提供了列类型,该列类型支持通过用户定义的主键进行无缝查找。在下面的示例中,我们使用具有列类型的方法创建具有主键的集合。然后,我们使用方法插入一个值。Key
name
CreateColumn()
ForKey()
InsertKey()
players := column.NewCollection() players.CreateColumn("name", column.ForKey()) // Create a "name" as a primary-key players.CreateColumn("class", column.ForString()) // .. and some other columns // Insert a player with "merlin" as its primary key players.InsertKey("merlin", func(r column.Row) error { r.SetString("class", "mage") return nil })
同样,您可以使用主键直接查询该数据,而无需知道确切的偏移量。请注意,使用主键会产生开销,因为它需要使用内部管理的哈希表查找偏移量的额外步骤。
// Query merlin's class players.QueryKey("merlin", func(r column.Row) error { class, _ := r.String("class") return nil })
存储二进制记录
如果您发现自己需要将更复杂的结构编码为单个列,则可以使用 function 来实现。这允许您指定将自动编码为单个列的 / 类型。在下面的示例中,我们将创建一个实现所需方法的类型。column.ForRecord()
BinaryMarshaler
BinaryUnmarshaler
Location
type Location struct { X float64 `json:"x"` Y float64 `json:"y"` } func (l Location) MarshalBinary() ([]byte, error) { return json.Marshal(l) } func (l *Location) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, l) }
现在我们有了记录实现,我们可以使用 function 为此结构创建一个列,如下所示。ForRecord()
players.CreateColumn("location", ForRecord(func() *Location { return new(Location) }))
为了操作记录,我们可以使用适当的 , 方法,类似于其他列类型。Record()
SetRecord()
Row
// Insert a new location idx, _ := players.Insert(func(r Row) error { r.SetRecord("location", &Location{X: 1, Y: 2}) return nil }) // Read the location back players.QueryAt(idx, func(r Row) error { location, ok := r.Record("location") return nil })
流式处理更改
此库还支持在发生所有事务提交时一致地流式传输它们。这允许您实现自己的变更数据捕获 (CDC) 侦听器,将数据流式传输到 kafka 或远程数据库以实现持久性。为了启用它,您只需在创建集合期间提供接口的实现。commit.Logger
在下面的示例中,我们利用了 a 的实现,它只是将提交发布到 go 通道中。在这里,我们创建一个缓冲通道,并使用单独的 goroutine 继续使用提交,允许我们在存储中发生事务时查看它们。commit.Channel
commit.Logger
// Create a new commit writer (simple channel) and a new collection writer := make(commit.Channel, 1024) players := NewCollection(column.Options{ Writer: writer, }) // Read the changes from the channel go func(){ for commit := range writer { fmt.Printf("commit %v\n", commit.ID) } }() // ... insert, update or delete
另外,此更改流保证是一致的和序列化的。这意味着您还可以在另一个数据库上复制这些更改并同步两者。事实上,这个库还提供了允许这样做的集合方法。在下面的示例中,我们创建了两个集合,并使用该方法与更改流一起异步复制所有提交。Replay()
primary
replica
primary
replica
Replay()
// Create a primary collection writer := make(commit.Channel, 1024) primary := column.NewCollection(column.Options{ Writer: &writer, }) primary.CreateColumnsOf(object) // Replica with the same schema replica := column.NewCollection() replica.CreateColumnsOf(object) // Keep 2 collections in sync go func() { for change := range writer { replica.Replay(change) } }()
快照和还原
在事务运行时,还可以以单个二进制格式保存集合。这允许您定期计划备份或确保在应用程序终止时保留所有数据。
若要拍摄快照,必须首先创建有效的目标,然后在集合上调用该方法以创建快照,如以下示例所示。io.Writer
Snapshot()
dst, err := os.Create("snapshot.bin") if err != nil { panic(err) } // Write a snapshot into the dst err := players.Snapshot(dst)
相反,若要还原现有快照,需要先打开集合,然后调用该方法。请注意,集合及其架构必须已经初始化,因为我们的快照本身不携带此信息。io.Reader
Restore()
src, err := os.Open("snapshot.bin") if err != nil { panic(err) } // Restore from an existing snapshot err := players.Restore(src)
例子
可以在此存储库的示例目录中找到此库的多个完整使用示例。
基准
下面的基准测试是在包含十几列的 100,000 个项目的集合上运行的。随意探索基准测试,但我强烈建议在您的实际数据集上测试它。
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkCollection/insert-8 2523 469481 ns/op 24356 B/op 500 allocs/op
BenchmarkCollection/select-at-8 22194190 54.23 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 2068 568953 ns/op 122 B/op 0 allocs/op
BenchmarkCollection/count-8 571449 2057 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 28660 41695 ns/op 3 B/op 0 allocs/op
BenchmarkCollection/update-at-8 5911978 202.8 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 1280 946272 ns/op 3726 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 6405852 188.9 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 2073188 562.6 ns/op 0 B/op 0 allocs/op
在测试较大的集合时,我添加了一个小示例(请参阅文件夹)并在插入 20 万行的情况下运行它,每个条目有 12 列和 4 个需要计算的索引,以及围绕它们的一些查询和扫描。examples
running insert of 20000000 rows...
-> insert took 20.4538183s
running snapshot of 20000000 rows...
-> snapshot took 2.57960038s
running full scan of age >= 30...
-> result = 10200000
-> full scan took 61.611822ms
running full scan of class == "rogue"...
-> result = 7160000
-> full scan took 81.389954ms
running indexed query of human mages...
-> result = 1360000
-> indexed query took 608.51µs
running indexed query of human female mages...
-> result = 640000
-> indexed query took 794.49µs
running update of balance of everyone...
-> updated 20000000 rows
-> update took 214.182216ms
running update of age of mages...
-> updated 6040000 rows
-> update took 81.292378ms
贡献
我们对贡献持开放态度,请随时提交拉取请求,我们将尽快对其进行审核。该图书馆由罗马·阿塔奇安特人维护
许可证
磁贴根据 MIT 许可证获得许可。