本申请是于2019年11月4日提交的美国专利申请No.16/672,939的继续申请,其公开内容特此通过引用并入本文。
技术领域
本公开的技术通常涉及一种用于改进涉及许多宿的混洗(shuffle)操作的效率的系统。在“混洗”中,来自多个源的数据块使用分发方案来在多个宿当中重新分发,该分发方案使每个源中的数据块被分发到多个宿。在混洗结束时,每个宿可以包括来自不止一个源的块。
背景技术
混洗数据被照惯例按其源组织并且映射到其每个宿(sink)的对应源。图1是示出混洗操作的示例的功能框图,其中存储在源10处的数据块被混洗到宿30。在图1的示例中,存在十四个源和十六个宿。每个宿被映射到并接收来自四个不同源的数据。例如,宿31和32中的每个宿被映射成接收来自源11、12、13和14的混洗数据。又如,宿33和34中的每个宿被映射到并接收被从源11和12映射到源13、从源13映射到源15并且从源15映射到宿33和34的混洗数据。对图1的混洗来说在宿与源之间存在总共64个映射——针对十六个宿中的每个宿有四个源。
照惯例,混洗操作可能需要每个源将其数据附加到公共日志。因此,混洗操作能够容易地缩放以适应附加源,并且用于完成混洗的操作的数目可以随着源的数目增加而线性地增加。然而,由于宿从多个源接收数据并因此被映射到多个不同源,所以每个宿必须扫描它可以从其接收数据的所有源。因此混洗操作不容易缩放以适应附加宿,因为用于完成混洗的操作的数目可能随着宿的数目增加而二次增加。随着在混洗操作中处理的数据量增加,数据可能不再适应有限数目的宿,所以变得有必要增加数据被再划分到的宿的数目。
技术实现要素:
本公开的一个方面涉及一种在分布式网络中再划分数据的方法。方法可以包括:由一个或多个处理器执行数据集的从多个第一源到多个第一宿的第一传递,每个第一宿从第一源中的一个或多个收集数据;以及由一个或多个处理器执行数据集的从多个第二源到多个第二宿的第二传递,多个第一宿中的每个第一宿对应于多个第二源中的一个,并且每个第二宿从第二源中的一个或多个收集数据。执行第一传递和第二传递可以使数据集被再划分,使得一个或多个第二宿收集源自第一源中的两个或更多个的数据。
在一些示例中,多个第一宿的数量可以大于多个第一源的数量。
在一些示例中,每个第一宿可以从第一源中的两个或更多个收集数据。
在一些示例中,多个第二宿的数量可以大于多个第二源的数量。
在一些示例中,方法可以进一步包括执行N个传递,N是具有大于二的值的数字。对于每个给定传递,多个宿可以从多个源中的一个或多个源收集数据,每个源对应于先前传递的宿。执行N个传递可以使数据集被再划分,使得一个或多个第N宿收集源自第一源中的两个或更多个的数据。
在一些示例中,对于N个传递中的至少一个传递,传递的每个宿可以从传递的源中的两个或更多个收集数据,并且传递中的两个或更多个源中的每个源可以包括源自前一个传递的不同源的数据。
在一些示例中,对于N个传递中的至少另一传递,传递的每个宿可以从传递的源中的两个或更多个收集数据,并且传递的两个或更多个源中的每个源可以包括源自前一个传递的不同源的数据。
在一些示例中,至少一个传递和至少另一传递可以是N个传递中的连续传递。
在一些示例中,多个第一宿中的每个第一宿可以是多个第二源中的对应一个第二源。方法可以进一步包括确定第一传递的完成并且在确定第一传递完成时发起第二传递。
在一些示例中,多个第一宿中的每个第一宿可以是多个第二源中的对应一个第二源,并且方法可以进一步包括:在第一传递之前,指定多个第一宿和多个第二宿中的每个,由此对多个第二宿的指定避免在第一传递期间在多个第二宿处收集来自多个第一源的数据;以及在至少一个第一宿完成从第一源中的一个或多个收集时并且在第一传递完成之前,指定要从至少一个第一宿收集的一个或多个第二宿。
在一些示例中,数据集可以使用第一散列函数被从多个第一源传递到多个第一宿,并且使用可能与第一散列函数相关的第二散列函数被从多个第二源传递到多个第二宿。
在一些示例中,多个第二宿的标识可以是连续的,并且对于每个第二宿,方法可以包括基于第二宿的标识来计算第二宿从其收集的第二源的标识的范围。
在一些示例中,方法可以进一步包括由一个或多个处理器冲洗第一传递的混洗日志,并且在冲洗完成之前,对数据集的已被从第一传递的混洗日志中冲洗的分段执行第二传递。
在一些示例中,执行第二传递可以包括执行多个混洗操作。第二源的第一部分可以被包括在第一数据日志中,而第二源的第二部分可以被包括在第二数据日志中。
在一些示例中,多个混洗中的每个混洗可以由一个或多个处理器并行执行,并且可以并行从第二源的第一部分和第二部分中冲洗包括在第一数据日志和第二数据日志两者中的数据。
本公开的另一方面涉及一种用于在分布式网络中再划分数据的系统,包括一个或多个处理器以及与一个或多个处理器通信的一个或多个存储设备。一个或多个存储设备可以包含指令,该指令被配置成使一个或多个处理器执行数据集的从多个第一源到多个第一宿的第一传递,每个第一宿从第一源中的一个或多个收集数据,并且执行数据集的从多个第二源到多个第二宿的第二传递,多个第一宿中的每个第一宿对应于多个第二源中的一个,并且每个第二宿从第二源中的一个或多个收集数据。第一传递和第二传递可以使数据集被再划分,使得一个或多个第二宿收集源自第一源中的两个或更多个的数据。
在一些示例中,每个第一宿可以从第一源中的两个或更多个收集数据,并且多个第二宿的数量可以大于多个第二源的数量。
在一些示例中,指令可以被配置成使一个或多个处理器执行N个传递,N是具有大于二的值的数字。对于每个给定传递,多个宿可以从多个源中的一个或多个源收集数据,每个源对应于先前传递的宿。执行N个传递可以使数据集被再划分,使得一个或多个第N宿收集源自第一源中的两个或更多个的数据。
在一些示例中,对于N个传递中的至少一个传递,传递的每个宿可以从传递的源中的两个或更多个收集数据,并且传递的两个或更多个源中的每个源包括源自前一个传递的不同源的数据。对于N个传递中的至少一个其他传递,传递的每个宿可以从其他传递的源中的两个或更多个收集数据,并且其他传递的两个或更多个源中的每个源可以包括源自前一个传递的不同源的数据。至少一个传递和至少一个其他传递可以是N个传递中的连续传递。
附图说明
图1是图示示例在先数据分发方案的框图。
图2A是根据本公开的方面的分布式数据存储内的示例系统的框图。
图2B是根据本公开的方面的示例系统和计算设备的框图。
图3是表示根据本公开的方面的示例多传递混洗的流程图。
图4至7是图示根据本公开的方面的示例数据分发方案的框图。
图8和图9是图示根据本公开的方面的示例工作流程的框图。
具体实施方式
概述
为了随宿的数目增加而缩放混洗操作,本公开将混洗操作划分成多传递(pass)。以这种方式,由每个宿读取的源的数目减少了,从而减轻每个宿的混洗操作的开销量。能够以不同方式实现多传递混洗操作并且每个实现方式可以以不同方式改进效率。
在一些实现方式中,多传递混洗的至少一个传递可以是“宿拆分”。在宿拆分中,在多个宿当中分发来自每个给定源的数据,但是每个宿从仅一个源读取数据。多传递混洗的一个传递中的宿拆分可以允许使用更少的宿来执行多传递混洗的其他传递。
例如,如果期望将数据从100,000个源分发到100,000个宿,则可以首先将数据传递到1,000个宿,然后可以将1,000个宿中的每个宿的数据分开地拆分在100个宿当中,从而在拆分之后产生总共100,000个宿。在第一传递中,1000个宿中的每个宿可以从一定数目的源读取数据。由于此传递是混洗并且每个宿从多个源读取数据,所以将此传递中的宿的数目减少100倍显著地降低传递的开销。在后续拆分中,每个宿从仅单个源读取,这也需要显著更少的开销。结果,由宿读取的源的数目在两个传递中减少了。
附加地或替换地,可以执行拆分作为多传递混洗的第一传递。如果混洗包括来自许多源的数据,则及早拆分数据可能更可取。例如,如果有1,000,000个源要分发到2,000,000个宿,则可以首先将来自1,000,000个源的数据各自拆分到2个宿中。然后可以将所得的2,000,000个宿分成2,000个组,每个组从1,000个不同源取数据并且将数据混洗到1,000个宿。第一混洗的宿可以被重新分组并且然后以类似方式混洗,从而导致每个原始源被相对彻底地分发。这种形式的分组和重新混洗在本文中被称为“蝶式混洗”。与在先示例一样,每个传递中的开销显著地减少了。蝶式混洗的第一传递的开销减少了,因为它是拆分,由此宿从仅一个源读取。因为1,000个源与1,000个宿之间的每个个别混洗需要比1,000,000个源与1,000,000个宿之间的混洗二次更少的开销,所以后续传递中的开销减少了。换句话说,尽管在每个传递中执行的混洗的总数线性地增加,但是与单个1,000,000至1,000,000混洗相比每个混洗的处理二次减少。
在蝶式混洗的以上示例中,从一个传递到下一传递执行的独立混洗的数目保持不变。在其他示例多传递混洗中,从一个传递到下一传递执行的混洗的数目可能倍增。例如,将数据从10,000个源混洗到20,000个宿的初始传递可能后面是具有10,000个源到20,000个宿的两个独立混洗的第二传递。第二传递的结果会将数据分发在40,000个宿当中。第二传递的每个独立混洗可以具有它自己的被混洗的数据块的元数据的日志。这对于增加通过整体混洗操作处理的宿的数目可能更可取,因为每个日志包含仅读取个别混洗的块而不是整个传递的所有块的那些宿的元数据。
如果需要在混洗期间从日志中冲洗元数据,则多传递混洗的使用可以改进冲洗操作的效率。这是因为能够控制多传递混洗的第二传递以仅分发已被从第一混洗中冲洗的数据。此外,在一些情况下,使用多传递来完成混洗可以完全避免从日志中冲洗元数据的需要,因为对于给定分段减少宿目的地的总数也减少为分段存储的元数据的总量。
在一些情况下,可以通过推迟后续传递的宿的划分直到稍后的时间来将多传递混洗实现为单个混洗。例如,在通过在第一传递中使用1,000个宿来将数据从100,000个源混洗到100,000个宿的示例中,可以推迟剩余99,000个宿的划分。这防止100,000个源直接写入到那99,000个宿。在一种情况下,可以阻塞99,000个宿直到整个第一传递完成为止。在另一情况下,下一传递可以在第一传递结束之前开始,但是可以改变下一传递的宿的划分方案以防止源直接写入到它们。
以上实现方式能够改进混洗性能,并且特别是当缩放超过数万个宿时。例如,测试结果已表明,通过对1,000个宿执行第一传递并且对100,000个宿执行第二次拆分传递,能够使将大约2TiB的数据从100,000个源混洗到100,000个宿的速度提高一倍以上。这导致执行混洗所需要的资源、成本和时间显著减少。
示例系统
图2A图示包括分布式计算环境的示例系统。多个数据中心160、170、180可以例如通过网络150通信地耦合。数据中心160、170、180可以进一步通过网络150与一个或多个客户端设备(诸如客户端110)进行通信。因此,例如,客户端110可以在“云”中执行操作。在一些示例中,数据中心160、170、180可以进一步与控制器190进行通信。
每个客户端110可以是意在供人使用的个人计算机或移动设备,其具有通常在个人计算机中找到的所有内部组件,诸如中央处理单元(CPU)、CD-ROM、硬盘驱动器和显示设备(例如,具有屏幕的监视器、投影仪、触摸屏、小LCD屏幕、电视或另一设备,诸如能够可操作来显示由处理器120处理的信息的电子设备)、扬声器、调制解调器和/或网络接口设备、用户输入设备(诸如鼠标、键盘、触摸屏或麦克风),以及用于将这些元件彼此连接的所有组件。此外,依照本文描述的系统和方法的计算机可以包括能够处理指令并且向人类和其他计算机传送数据以及从人类和其他计算机传送数据的设备,包括通用计算机、PDA、平板、移动电话、智能手表、缺少本地存储能力的网络计算机、电视用机顶盒和其他联网设备。
客户端110可以包含处理器120、存储器130和通常存在于通用计算机中的其他组件。存储器130能够存储可由处理器120访问的信息,包括能够由处理器120执行的指令132。存储器还能够包括能够由处理器120检索、操纵或存储的数据134。存储器130可以是一种能够存储可由处理器120访问的信息的非暂时性计算机可读介质,诸如硬盘驱动器、固态驱动器、磁带驱动器、光学存储装置、存储卡、ROM、RAM、DVD、CD-ROM、可写和只读存储器。处理器120可以是公知处理器或其他鲜为人知类型的处理器。替换地,处理器120可以是诸如ASIC的专用控制器。
指令132可以是由处理器120直接执行的指令集,诸如机器代码,或由处理器120间接执行的指令集,诸如脚本。在这方面,能够在本文中互换地使用术语“指令”、“步骤”和“程序”。指令132能够被以用于由处理器120直接处理的目标代码格式存储,或者以其他类型的计算机语言存储,该其他类型的计算机语言包括脚本或按需解释或提前编译的独立源代码模块的集合。
数据134能够由处理器120依照指令132来检索、存储或修改。例如,尽管系统和方法不受特定数据结构限制,但是数据134能够被存储在计算机寄存器中,作为具有多个不同字段和记录的结构被存储在数据存储中,或者被存储在文档或缓冲区中。还能够以诸如但不限于二进制值、ASCII或Unicode的计算机可读格式格式化数据134。此外,数据134能够包括足以标识相关信息的信息,诸如数字、描述性文本、专有代码、指针、对存储在其他存储器(包括其他网络位置)中的数据的引用,或由函数使用来计算相关数据的信息。
尽管图2A在功能上将处理器120和存储器130图示为在同一块内,但是处理器120和存储器130可以实际上包括多个处理器和存储器,该多个处理器和存储器可以或可能未被存储在同一物理外壳内。例如,一些指令132和数据134能够被存储在可移动CD-ROM上,而其他指令和数据能够被存储在只读计算机芯片内。一些或所有指令和数据能够被存储在物理上远离处理器120然而仍可由处理器120访问的位置中。类似地,处理器120能够实际上包括可以或可能不并行操作的处理器的集合。
数据中心160至180可以被定位为彼此相距相当远的距离。例如,数据中心可以被定位在世界各地的各个国家中。每个数据中心160、170、180可以包括一个或多个计算设备,诸如处理器、服务器、分片等。例如,如图2A所示,数据中心160包括计算设备162、164,数据中心170包括计算设备172,并且数据中心180包括计算设备181至186。根据一些示例,计算设备可以包括在主机上运行的一个或多个虚拟机。例如,计算设备162可以是支持运行操作系统和应用的多个虚拟机166、167的主机。虽然在图2A中图示了仅几个虚拟机166、167,但是应该理解,任何数目的虚拟机可以由任何数目的主机计算设备支持。此外,应该理解,图2A中图示的配置仅仅是示例,并且示例数据中心160至180中的每个数据中心中的计算设备可以具有可以彼此相同或不同的各种结构和组件。
可以跨越这些计算设备执行程序,例如,使得一些操作由第一数据中心的一个或多个计算设备执行,然而其他操作由第二数据中心的一个或多个计算设备执行。在一些示例中,各种数据中心中的计算设备可以具有不同的容量。例如,不同的计算设备可以具有不同的处理速度、工作负载等。虽然示出了仅这些计算设备中的几个,但是应该理解,每个数据中心160、170、180可以包括任何数目的计算设备,并且第一数据中心中的计算设备的数目可以不同于第二数据中心中的计算设备的数目。此外,应该理解,每个数据中心160至180中的计算设备的数目可以随着时间的推移(例如随着硬件被移除、替换、升级或扩展)而变化。
在一些示例中,每个数据中心160至180还可以包括许多存储设备(未示出),诸如硬盘驱动器、随机存取存储器、磁盘、磁盘阵列、磁带驱动器或任何其他类型的存储设备。数据中心160至180可以实现许多架构和技术中的任一种,包括但不限于直接附连存储(DAS)、网络附连存储(NAS)、存储区域网络(SAN)、光纤通道(FC)、以太网光纤通道(FCoE)、混合架构网络等。除了存储设备之外,数据中心还可以包括许多其他设备,诸如电缆线路、路由器等。此外,在一些示例中数据中心160至180可以是虚拟化环境。此外,虽然示出了仅几个数据中心160至180,但是许多数据中心可以通过网络150和/或附加网络耦合。
在一些示例中,控制器190可以与数据中心160至180中的计算设备进行通信,并且可以促进程序的执行。例如,控制器190可以跟踪每个计算设备的容量、状态、工作负载或其他信息,并且使用此类信息来指派任务。类似于上述客户端110,控制器190可以包括处理器198以及包括数据194和指令196的存储器192。控制器190可以被配置成重新分发或再划分存储在数据中心160至180中的计算设备当中的数据。
客户端110、数据中心160至180和控制器190可以能够例如通过网络150进行直接和间接通信。例如,使用因特网套接字,客户端110能够通过因特网协议套件连接到在远程服务器上运行的服务。服务器能够设置可以接受用于发送和接收信息的发起连接的侦听套接字。网络150和中间节点可以包括各种配置和协议,包括因特网、万维网、内联网、虚拟专用网络、广域网、局域网、使用一个或多个公司专有的通信协议的专用网络、以太网、WiFi(例如,702.71、702.71b、g、n或其他此类标准)以及RPC、HTTP和上述各项的各种组合。这种通信可以由能够向其他计算机传送数据和从其他计算机传送数据的设备(诸如调制解调器(例如,拨号、电缆或光纤)和无线接口)促进。
客户端110可以请求访问存储在数据中心160至180的计算设备中的数据。这种请求可以由控制器190和/或数据中心160至180中的一个或多个计算设备来处理。在一些示例中,对请求的响应可以涉及或以其他方式需要数据的操纵,诸如使用本文更详细地描述的操作。
图2B是图示依照本公开的具有用于执行混洗操作的一个或多个计算设备200的示例系统的框图。计算设备200可以被包括在分布式数据处理系统(诸如数据中心160至180中的一个的计算设备)或控制器190中,如图2A所示。计算设备200可以被配置成通过再划分数据来对大量数据运行复杂查询,诸如“JOIN”和“GROUP BY”操作。可以响应于查询而执行此类操作。在一些示例中,查询可以由客户端计算设备接收。在一些示例中,为了执行从客户端计算设备接收到的其他指令或查询,可能需要查询。
分布式数据处理系统中的数据可以被短暂地存储,诸如在分布式存储器内文件系统或磁盘中,或者通过两者的任何组合来存储。可以将数据从许多数据源A1至AN混洗到许多数据宿B1至BN。出于在再划分期间跟踪数据的目的,可以为源和宿指配虚拟地址。在一些示例中,可以将数据存储在虚拟机(诸如由图2A的数据中心160至180托管的虚拟机166至167)中。
计算设备200可以包括一个或多个处理器210、服务器、分片、单元等。应该理解,每个计算设备可以包括任何数目的处理器或计算设备,计算设备中的此类设备的数目可以随着时间的推移(例如随着硬件)被移除、替换、升级或扩展而变化。
计算设备200还可以包括许多存储设备或存储器220,诸如硬盘驱动器、随机存取存储器、磁盘、磁盘阵列、磁带驱动器或任何其他类型的存储设备。计算设备200可以实现许多架构和技术中的任一种,包括但不限于直接附连存储(DAS)、网络附连存储(NAS)、存储区域网络(SAN)、光纤通道(FC)、以太网光纤通道(FCoE)、混合架构网络等。除了存储设备之外,计算设备200还可以包括许多其他设备,诸如用于实现计算设备之间的输入和输出的通信设备250,其诸如电缆线路、路由器等。
每个计算设备的存储器220能够存储可由一个或多个处理器210访问的信息,包括在计算设备200处接收或由计算设备200生成的数据230,以及能够由一个或多个处理器210执行的指令240。
数据230可以包括跟踪系统的源A1至AN和宿B1至BN之间的混洗操作的混洗日志232。混洗日志232可以包括在混洗操作中混洗的数据分段234、236的细节,诸如个别数据分段238的元数据,包括关于分段长度的信息和关于每个分段的提交状态。在概念上,混洗数据可以是作为源与其相应的目的地宿之间的映射的表示。
如在下面更详细讨论的,可以将混洗操作划分成从第一传递到第N传递的多传递。因此,数据分段细节在图2B的示例混洗日志232中被示出为被分开地存储。每个传递对细节的单独跟踪可以帮助管理数据分段细节234、236和元数据238的存储。例如,即使后续传递正在进行,一旦第一传递已完成并且数据已被存储在其目的地宿的日志文件中,就可以从存储器220中冲洗第一传递234的细节。在其他情况下,在混洗日志232与宿的数据日志之间可能存在重叠。
指令240可以包括被配置成控制数据混洗的操作的混洗控制程序242。指令240还可以包括被配置成管理混洗日志232的数据分段细节234、236和元数据238的存储的混洗日志冲洗程序244。在下面更详细地讨论存储的数据和程序的以上示例。
示例方法
图3是图示将来自许多源的数据混洗到多个宿的示例多传递混洗的流程图300。流程图中的操作可以由计算设备200的处理器(诸如图2B的一个或多个处理器210)执行,该一个或多个处理器210与混洗数据的存储位置(诸如源A1至AN和宿B1至BN)通信。
在框310,可以执行数据的第一传递。第一传递可以涉及将数据从第一源集移动到第一宿集。然后,在框320,可以执行数据的第二传递。第二传递可以涉及将数据从可以对应于第一宿集的第二源集移动到第二宿集。
例如,图4示出数据分发方案和设置的示例,其中在一系列两个传递中使用混洗操作来混洗数据。混洗操作被划分成两个单独的传递:从第一源400到第一宿410的第一传递,以及从对应于第一宿410的第二源420到第二宿430的第二传递。在第一传递中,第一宿410中的每个从第一源400中的三个不同第一源接收数据分段。例如,宿411从源401、402和403中的每个源接收数据。在另一示例中,宿418从源401、402和406中的每个源接收数据。由于宿中的每个被映射到它们从其接收数据的源,所以对图4的混洗的第一传递来说在第一宿与第一源之间存在总共24个映射——针对八个宿中的每个宿有三个映射。在第二传递中,第二宿430中的每个从可以对应于第二宿410的第二源420中的一个接收数据。例如,宿431和432中的每个宿从源421接收数据。例如,宿445和446中的每个宿从源418接收数据。由于宿中的每个被映射到仅它从其接收数据的源,所以对图4的混洗的第二传递来说在第二宿与第二个源之间仅存在总共16个映射——针对十六个宿中的每个宿有一个映射。同时在两传递混洗过程中跟踪的映射的总数是40——第一传递的24个映射和第二传递的16个映射——其小于相同混洗在作为单个传递被执行时的总共64个映射。
随着在混洗操作中涉及的源和宿的数目增加,执行多传递的益处也增加。例如,为了在单个传递中将数据从100,000个源分发到100,000个宿,可能需要每个宿从许多源(例如,10,000个源)读取数据。这将在源与宿之间需要总共10,000*100,000=10亿个映射。即使每个宿将从仅1,000个源读取,也仍然将需要跟踪1亿个映射。引入第二传递将允许数据首先被从源传递到宿的小初始子集,诸如1,000个宿,然后可以将1,000个宿中的每个宿的数据分开地拆分在100个宿当中。在第一传递中,如果1,000个宿中的每个宿从100,000个源中的10,000个或1,000个源读取数据,则映射的总数将分别合计为1000万或100万。在第二传递中,100,000个宿中的每个宿将被映射到单个源,从而合计达总共100,000个映射。因此,在两传递混洗中跟踪的映射总数将在使用10,000个中间宿时合计为1010万个映射,或者在使用1,000个中间宿时合计为110万个映射。这个映射数显著地小于单个传递场景的10亿或1亿个映射。实验表明,使用1,000个中间宿来混洗约2TiB的数据的混洗操作可以是在没有任何中间宿的情况下执行的相同操作的至少两倍。
图4的以上示例图示一个这种两传递混洗,其中第二传递被称为“宿拆分”。在宿拆分中,在多个宿当中分发来自每个给定源的数据,但是每个宿从仅一个源读取数据。多传递混洗的一个传递中的宿拆分可以允许使用更少的宿来执行多传递混洗的其他先前传递。如能够从以上示例场景看到的,将宿拆分作为混洗操作的后续传递引入能够显著地减少先前传递中的开销,因为需要被映射到多个源并且从多个源读取数据的中间宿少得多。宿拆分的引入还能够显著地减少最后传递中的开销,因为最后宿中的每个被映射到仅一个源并且从仅一个源读取数据。
在以上示例中,一个传递的宿被描述为“对应”于下一传递的源。对应可以是一个传递的宿与下一传递的源相同,意味着它们的地址是相同的。替换地,宿的地址可以与下一传递的源的地址不同,但是宿的所有数据都可以被移动到下一传递的对应源的地址。
多传递混洗可以包括进一步操作,诸如第三传递、第四传递等。在图3的示例多传递混洗中,传递被示出为被执行直到第N传递为止。特别地,在框330,可以执行数据的第N-1传递。第N-1传递可以涉及将数据从可以对应于第N-2宿集的第N-1源集移动到第N-1宿集。此外,在框340,可以执行数据的第N传递和最后传递。第N传递可以涉及将数据从可以对应于第N-1宿集的第N源集移动到第N宿集。
N的值可以取决于要混洗的数据量、混洗操作的目的和正在执行的混洗操作的特定类型而变化。例如,在图4的示例所示的宿拆分的情况下,具有相对少数目的传递(诸如两传递)可能就足够了。在其他类型的混洗操作中,将操作划分成三传递或更多传递可以是有益的。
图5示出“蝶式”混洗分发方案的示例。在蝶式算法中,有限数目的源与有限数目的宿配对,然后在已配对的宿当中混洗来自源的数据。因此,在每传递中但在单独的块中混洗所有数据。对整个操作引入多个蝶式传递允许来自每个单独的块的混洗数据与来自其他分开地混洗的块的数据配对。
在图5的示例中,混洗操作包括从第一源500到第一宿510的第一传递、从第二源520(其对应于第一宿510)到第二宿530的第二传递、以及从第三源540(其对应于第二宿530)到第三宿550的第三传递。在第一传递中,第一宿510中的每个从第一源500中的一个接收数据分段。例如,宿511从源501接收数据,宿513从源502接收数据,宿515从源503接收数据,并且宿517从源504接收数据。在每个后续传递中,传递中的每个宿从两个源接收数据的分段,该两个源在前一个传递中已从不同源接收分段。例如,在第二传递中,第二宿531从第二源521和522中的每个第二源接收数据分段,该第二源521在在先第一传递中从第一源501接收数据,该第二源522在在先第一传递中从第一源502接收数据。宿535从第二源525和526中的每个第二源接收数据分段,该第二源525在在先第一传递中从第一源503接收数据,该第二源526在在先第一传递中从第一源504接收数据。又如,在第三传递中,第三宿551和552中的每个第三宿从第三源541和542中的每个第三源接收数据分段,该第三源541在在先第二传递中从第二源521和522接收数据,该第三源542在在先第二传递中从第二源552和526接收数据。
尽管图5的示例示出了仅第一传递、第二传递和第三传递被示出,但是本领域的技术人员将容易地理解,操作可以针对附加传递继续进行。在图5的示例中,每个传递被示出为具有不超过八个源和八个宿,这足以让每个最后宿550包括来自第一源501、502、503和504中的每个第一源的数据分段。随着源和宿的数目增加,可能有必要向蝶式混洗添加更多传递,或者增加每个混洗操作的复杂性。
当从大量源分发数据时,“蝶式”混洗的使用是特别有益的。例如,为了在单个传递中数据从1,000,000个源分发到2,000,000个宿,可能需要每个宿从许多源读取数据。例如,如果每个宿将从10,000个源读取,则混洗将在源与宿之间需要总共10,000*2,000,000=200亿个映射。即使每个宿将从仅1,000个源读取,也仍然将需要跟踪20亿个映射。即使每个宿将从仅8个源读取,如在图5的示例中一样,这种布置也仍然将需要跟踪1600万个映射(针对200万个宿中的每个宿有8个映射)。在多传递蝶式混洗操作内引入初始拆分作为第一传递将允许数据首先被传递到2,000,000个宿,同时在源与宿之间的映射数最少,即2,000,000个映射,或者每个宿各有一个源。然后在每个后续传递中,每个宿可以被映射到两个源,从而对第二传递来说产生总共4,000,000个映射,对第三传递来说产生附加4,000,000个映射,并且对任何后续传递来说相同。总的来说,对于图5所示的三个传递,必须被跟踪的源与宿之间的映射的总数合计达1000万个映射,这显著地低于100亿或10亿个映射,并且仍然低于1600万个映射,分别是在上描述示例单个传递场景中需要的。因此,可以显著地减少混洗操作的每个传递中的开销:第一传递的开销减少了,因为它是拆分;并且后续传递中的开销减少了,因为每个传递需要比1,000,000个源与1,000,000个宿之间的单个传递混洗二次更少的开销。换句话说,尽管在每个传递中执行的混洗的总数线性地增加,但是用于混洗的总处理量总体上二次减少。
在图4所示的宿拆分的以上示例中,每个传递被示出为包括单个操作,而不是独立地执行的操作。在其他示例中,在多传递混洗操作的单个给定传递中执行的操作的数目可以是不止一个。例如,本领域的技术人员将认识到,图5所示的个别蝶式混洗可能被彼此独立地执行,从而在第二传递和第三传递的每个传递中产生多个操作。
附加地,在图5中的蝶式混洗的示例中,独立混洗的数目从一个传递到下一传递保持不变。在其他示例中,在每个传递中执行的独立操作的数目可以从一个传递到下一传递改变。
图6示出涉及多个宿拆分的“混洗拆分”分发方案的示例,其中在每个传递中执行的拆分操作的数目从一个传递到下一传递增加。特别地,在图6的示例中,从第一源600到第一宿610的第一传递,为此在四个宿611、612、613和614当中混洗两个源601和602的数据。在从对应于第一宿610的第二源620到第二宿630的第二传递中,每个源被拆分到两个宿中。例如,源621的数据在宿631和632之间被拆分,源622的数据在宿633和634之间被拆分,源623的数据在宿635和636之间拆分,源624的数据在宿637和638之间被拆分。在从对应于第二宿630的第三源640到第三宿650的第三传递中,每个源被再次拆分到两个宿中。例如,源641的数据在宿651和652之间被拆分,源642的数据在宿653和654之间被拆分,源643的数据在宿655和656之间被拆分,并且源644的数据在宿657和658之间被拆分。
此外,第二传递的混洗操作在两个独立地执行的操作之间被拆分或划分,使得在第一操作682中管理源621和622中的数据的拆分,并且在第二单独的操作684中管理源623和624中的数据拆分。类似地,第二传递的每个给定独立操作的宿在第三传递的两个单独的操作之间被拆分。例如,在第一操作692中管理源641和642中的数据的拆分,并且在第二单独的操作694中管理源643和644中的数据的拆分。类似地,然后在第三传递中在两个单独的拆分操作696、698之间划分在第二传递的操作684中拆分的数据。
在图6的示例中,从一个编到下一编的拆分操作的数目增加了两倍。因此这种混洗拆分的“拆分倍数”据说为二。在其他示例中,独立操作的数目可以增加大于二(诸如三、四、五、八、十等)的倍数。
尽管不可以通过将传递作为多个单独的操作进行处理来减少源与宿之间的映射的总数,但是单独的操作的使用确实允许对于同一传递使用多个混洗日志。这允许使每个混洗日志的整体大小维持在减少的大小。随着源和宿的数目增加并且要在混洗操作期间跟踪的元数据量增加,将元数据划分成单独的混洗日志可以是有益的。例如,并且进一步参考图6,控制从第二源620到第二宿630的第二传递的操作的处理器,诸如图2B的示例中示出的一个或多个处理器210,可以控制操作682和684被顺序地或同时地处理,从而改进对混洗操作的带宽和定时的控制。
例如,将数据从10,000个源混洗到20,000个宿的初始传递可以后面是具有10,000个源至20,000个宿的两个独立混洗的第二传递。第二传递的结果会将数据分发在40,000个宿当中,但是可以在两个单独的混洗日志之间拆分在第二传递中移动的数据块的元数据。当混洗操作涉及远大于源的数目的许多宿时,这可以是特别有帮助的,因为数据在宿的数目增加之前可能在操作的早期传递中被充分地混洗,然后根据由单独的混洗日志管理的独立操作被拆分到许多宿中。以这种方式,每个混洗日志将包含仅读取它自己的操作的块的那些宿的元数据,而不包含传递的其他独立操作的块。
附加地,如果操作被并行或同时处理,并且如果一个操作在另一个操作之前完成,则可以冲洗来自已完成操作的混洗日志的元数据,而不必等待另一个单独的操作也完成。为了图示如果包含100,000个均匀分布的宿的单个“宿拆分”操作的混洗日志将包含50GiB,则冲洗器将在每宿完成时冲洗平均约0.5MiB的数据。然而,如果在100个单独的“宿拆分”操作当中拆分相同的混洗日志数据,每个操作包括1,000个宿,则冲洗器将在每宿完成时冲洗平均约50MiB的数据。
图7示出组合来自图5的蝶形混洗和来自图6的混洗拆分的两个构思的示例数据分发方案。在图7的示例中,第一源700的数据被写入到第一宿710的第一传递、与第一宿710相对应的第二源720的数据被写入到第二宿730的第二传递、以及与第二宿730相对应的第三源740的数据被写入到第三宿750的第三传递。
在每个传递中,每个宿可以从两个源接收数据。此外,在每个传递中,可以将宿分成组,由此每个宿组从相同的两个源接收数据。以这种方式,每个宿组及其对应的两个源的混洗操作可以作为使它自己的混洗日志限于写入到给定宿组的数据的元数据的独立操作被处理。例如,在第二传递中,第二宿735、736、737和738可以全部从第二源723和724接收数据。源723和724与宿735、736、737和738之间的混洗可以在它自己的包含仅写入到宿735、736、737和738的数据的元数据的混洗日志中被独立地管理。
在图7的示例中,从每个传递到下一传递执行的独立操作的数目增加了2倍。也就是说,第一传递涉及两个操作,第二传递涉及四个操作,而第三传递可能涉及八个操作(未全部示出)。
从图7的示例还能够看到,每个第三宿750可以从第一源700中的每个第一源接收数据。例如,第三宿751中的每个可以包括第三源741的数据,该第三源741进而对应于第二宿735。第二宿735可以包括来自所对应的第二源723和724中的每个第二源的数据,该对应的第二源723和724分别对应于第一宿712和716。第一宿712可以包括来自第一源701和702中的每个第一源的数据,并且第一宿716可以包括来自第一源703和704中的每个第一源的数据。因此,能够将第三宿751和752中的数据追溯到第一源700中的每个。
换句话说,图7的示例可以被认为像拆分混洗的原因在于独立操作的数目能够从一个传递到下一个传递倍增,并且可以被认为像蝶式混洗的原因在于能够在后续传递的单个操作中组合宿从先前传递的不同源汲取的数据。这能够向混洗操作添加进一层的复杂性和鲁棒性以便为应用于所收集的数据的查询产生改进的结果。
图4、图5、图6和图7的以上示例演示了可以被执行以便随着数据、源、宿、混洗或其任何组合的量增加而减轻配额的各种类型的传递和混洗操作。一个或多个处理器,诸如图2B的处理器210,可以被编程为跟踪混洗操作的进度并且基于跟踪进度来动态地确定要执行的传递的类型。与各种操作(诸如“加入”或“组混洗”)相关联的自动操作者或计划适配器可以动态地决定是否发起多传递混洗,并且如果是这样的话,决定要应用什么类型的操作。例如,如果数据尚未被一致地分发到加入的两侧,则自动加入计划适配器可以在发起“混洗拆分”之间确定,或者“宿拆分”是数据已经被一致地分发。
在一些示例中,响应于处理器确定监测值超过预定阈值,可以由一个或多个处理器如图2B的处理器210发起混洗拆分。监测值可以是在给定传递中利用的宿的数目,由此宿被拆分成单独的操作,直到每个操作中的宿的数目小于阈值为止。替换地或附加地,监测值可以是给定传递中的源与宿之间的映射的数目,由此宿被拆分成单独的操作,直到每个单独的操作中的源与宿之间的映射的数目小于阈值为止。
以上示例演示其中每个传递被视为单独的操作的多传递混洗的示例。在其他示例中,可以将各传递组合为单个混洗操作,然而同时维持减少的开销和更小的元数据日志的好处。此外,以上示例将每个传递的宿视为彼此分开。在其他示例中,在每个传递的宿之间可能存在重叠。例如,在每个传递写入到比先前传递更多的宿的示例混洗操作中,一个传递的宿可以是下一传递的宿的子集,并且下一传递的宿可以是后续传递的宿的子集,依此类推。
例如,如果将数据从100,000个源混洗到100,000个宿,并且如果在第一传递中使用1,000个宿,则第一传递的1,000个宿可以是要在第二传递中使用的100,000个宿中的1,000个宿。在这样的情况下,可以利用划分方案来防止第一源写入到不是第一宿的剩余99,000个宿中的任一个。这样的划分方案的一个效果是它将先前示例的多个混洗操作变换成基本上从一个预先指定的源集到一个预先指定的宿集的单个混洗操作。
图8和图9示出了可以用于划分这样的单个混洗操作的传递的示例划分方案。图8和图9的示例限于第一传递和第二传递,但是应该理解,可能在具有不止两个传递的混洗操作中重复其中概述的方案。
图8的示例示出“推迟划分”方案,其中第一传递810在第二传递820开始之前完成。第一传递810可以涉及一个或多个处理器,诸如图2B的一个或多个处理器210,指定第一源集(812)和第一宿集(814),并且阻塞第二宿集(816)。当根据来自一个或多个处理器的指令来将数据从第一源集写入到第一宿集(818)时,第二宿集可以保持被阻塞接收任何混洗数据。这可以确保第一传递中的所有混洗数据被仅写入到第一宿,而不写入到第二宿。当第一传递810完全完成时,然后一个或多个处理器可以发起第二传递820的操作,由此可以解除阻塞第二宿集(822),此后可以将来自第二源(其可以对应于第一宿)的数据写入到第二宿集(824)。一旦所有数据都已被写入到第二宿集,那么第二传递就完成了。
图9的替代示例示出“流水线推迟划分”方案,其中第二传递920可以在第一传递910完成之前开始。第一传递910可以涉及一个或多个处理器使用流水线来指定第一源集(812)、第一宿集(814)和第二宿集(816)。流水线可以使用划分方案,由此第一源当中谁都不写入到具有推迟划分的第一宿写入到的宿。即使对其他第一宿的写入仍然尚未完成,第二传递920也可以在对第一宿中的任何一个的写入完成时开始。在第二传递920中,与已完成的第一宿相对应的第二源可以根据划分方案将数据写入到第二宿(922)。随着对每个第一宿的写入操作完成,针对另一对应的第二源的写入操作可以开始,并且这可以继续直到整个第二传递920完成为止。
在一些示例中,多传递混洗的第二传递的发起能够以已经从第一混洗的混洗日志中冲洗数据为条件。这种调节可以确保多传递混洗的混洗日志不占去不必要的空间,并且可以改进冲洗操作的效率。在其他示例中,使用多传递来完成混洗操作本身可以避免对完全从混洗日志中冲洗元数据的需要,因为减少任何给定分段的宿目的地的总数(如在多传递混洗中完成的那样)还将减少需要为每个分段存储的元数据的总量。
在一些示例中,多传递混洗操作的传递可以流水线化。这可能增加一个或多个处理器的配额使用,但具有改进的性能的优点。在这样的示例中,在较早传递中发生的混洗可以被给予更高的优先级,诸如被给予足够的配额,以便避免流水线中的积压。一个或多个处理器可以从调度器程序接收指令以便适当地在流水线化混洗和传递当中分发配额。
在一些示例中,多传递混洗操作的划分方案可以在一个传递中将数据再划分到所有可用宿,然后在后续传递中将数据压缩到可用宿的子集。这样的划分方案可以优化在后续传递中对宿的读取。特别地,如果使用的宿地址是连续的,并且如果混洗日志将可用宿的子集映射到不重叠范围的地址,则后续传递的宿的查找操作可以和给定范围的地址一样简单。结果,源与宿之间的映射可能不占去任何空间,因为能够基于宿自己的地址确定宿被指定读取的源,而不必存储宿与源地址之间的单独的映射。
在以上示例中,每个传递可以使用散列划分函数以便将数据从源引导到它们相应的目的地宿。每个传递中使用的散列划分函数可以与先前传递的散列函数相关。类似地,在“混洗拆分”的情况下,由此单独的混洗操作在单个传递中被分开地进行并且从先前传递的公共混洗操作中拆分,单独的混洗操作中的每个可以使用与先前传递的散列函数相关的相应的散列函数。
上述示例通常解决在设法对大于约10TiB的数据量(诸如数十TiB的数据)运行复杂查询时出现的问题。这样的数据量通常需要使用超过10,000个宿,这为常规单个传递混洗操作带来缩放困难。本领域的技术人员将认识到,本文描述的多传递混洗操作的优点也适用于更少量的数据。也就是说,即使可能使用常规单个传递混洗操作来处理那些较小量,本文描述的多传递混洗操作也可以改进效率并且减少操作的整体成本和开销。实际上,一些实验已表明,大约1TiB的较小输入数据量的整体加速可以大于大约10TiB的较大输入数据量的加速。
尽管已经参考特定实施例描述了本文的技术,但是应当理解,这些实施例仅仅图示本技术的原理和应用。因此应当理解,在不脱离如由所附权利要求所限定的本技术的精神和范围的情况下,可以对说明性实施例做出许多修改并且可以设计其他布置。
大多数前面的替代示例不是相互排斥的,而是可以被以各种组合实现以实现独特优点。由于能够在不脱离如由权利要求所限定的主题的情况下利用以上讨论的特征的这些及其他变化和组合,所以实施例的前面的描述应该作为图示而不作为由权利要求所限定的主题的限制被理解。作为示例,不必以上述精确次序执行前面的操作。相反,能够以不同次序如相反次序或同时地处理各个步骤。除非另外陈述,否则还能够省略步骤。另外,本文描述的示例以及措辞为“诸如”、“包括”等的子句的提供不应该被解释为将权利要求的主题限于特定示例;相反,示例旨在图示许多可能的实施例中的仅一个。此外,不同附图中的相同的附图标记能够标识相同或类似的元件。