MapReduce结果的增量更新(Incremental update to MapReduce results)
我有一个MapReduce作业,它填充HBase中的搜索索引。 此MapReduce作业每天在完整数据集上运行。 有没有办法我可以在上次计算索引后到达的新数据上运行MapReduce,然后在HBase中正确更新搜索索引?
I have a MapReduce job which populates the search index in HBase. This MapReduce job runs daily on the complete data set. Is there a way I can just run my MapReduce on new data which arrived after the last time indexes were computed and then properly update the search index in HBase?
原文:https://stackoverflow.com/questions/15639294
最满意答案
听起来您可能正在寻找的操作员是扫描。
let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), [])
扫描在可观察流中随时间累积值,并且流中的每个项目获取最后发射的值和当前值作为参数。 对它们执行一个函数,然后发出结果。 上面的示例获取一个新值并将其附加到完整数组,第二个参数将其初始化为空数组。
这显然有点限制,因为它只做一件事,这可能不够健壮。 在这种情况下,你需要聪明:
let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
现在你传入一个“动作”,它有一个修饰符函数,它定义了你想要修改完整数组的方式,以及修饰符可能需要与完整数组一起进入修饰符函数的任何附加数据的有效负载
所以你可能会这样做:
let modifier = (full, item) => full.splice(full.indexOf(item), 1); arraySubject.next({modifier, payload: itemToRemove});
删除您发送的项目。 您可以将此模式扩展为字面上的任何数组修改。
扫描的“问题”是订阅者只能从他们所订购的时间获得累积值。 所以,这将发生:
let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []); let subscriber1 = array$.subscribe(); //subscriber1 gets [] let modifier = (full, val) => full.concat([val]); arraySubject.next({modifier, payload:1}); //subscriber1 gets [1] arraySubject.next({modifier, payload:2}); //subscriber1 gets [1,2] let subscriber2 = array$.subscribe(); //subscriber2 gets [2] arraySubject.next({modifier, payload:3}); //subscriber1 gets [1,2,3] //subscriber2 gets [2,3]
看看那里发生了什么? 在behaviorubject中存储的唯一内容是第二个事件,而不是完整数组,scan是存储完整数组,因此第二个订阅者只获得第二个操作,因为它在第一个操作期间没有订阅。 所以你需要一个持久的订阅者模式:
let arraySubject = BehaviorSubject([]); let arrayModifierSubject = new Subject(); arrayModifierSubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []).subscribe(arraySubject);
并通过在arrayModifierSubject上调用next来修改:
let modifier = (full, val) => full.concat([val]); arrayModifierSubject.next({modifier, payload: 1});
并且您的订阅者从数组源获取数组:
subscriber1 = arraySubject.subscribe();
在此设置中,所有数组修改都通过修饰符主题,修改者主体将其广播到行为主体,该行为主体为将来的订阅者存储完整数组并将其广播给当前订阅者。 behaviorubject(商店主题)持久地订阅修改主题(动作主题),并且是动作主题的唯一订阅者,因此完整数组永远不会丢失,因为始终保持整个动作历史记录。
一些示例用法(使用上面的设置):
// insert 1 at end let modifier = (full, value) => full.concat([value]); arrayModifierSubject.next({modifier, payload: 1}); // insert 1 at start let modifier = (full, value) => [value].concat(full); arrayModifierSubject.next({modifier, payload: 1}); // remove 1 let modifier = (full, value) => full.splice(full.indexOf(value),1); arrayModifierSubject.next({modifier, payload: 1}); // change all instances of 1 to 2 let modifier = (full, value) => full.map(v => (v === value.target) ? value.newValue : v); arrayModifierSubject.next({modifier, payload: {target: 1, newValue: 2}});
您可以在“publishNumbersChange”函数中包装任何这些函数。 你如何完全实现这取决于你的需求,你可以做如下功能:
insertNumber(numberToInsert:number) => { let modifier = (full, val) => full.concat([val]); publishNumbersChange(modifier, numberToInsert); } publishNumbersChange(modifier, payload) => { arrayModifierSubject.next({modifier, payload}); }
或者您可以声明一个接口并创建类并使用它:
publishNumbersChange({modifier, payload}) => { arrayModifierSubject.next({modifier, payload}); } interface NumberArrayModifier { modifier: (full: number[], payload:any) => number[]; payload: any; } class InsertNumber implements NumberArrayModifier { modifier = (full: number[], payload: number): number[] => full.concat([payload]); payload: number; constructor(numberToInsert:number) { this.payload = numberToInsert; } } publishNumbersChange(new InsertNumber(1));
您还可以将类似功能扩展到任何阵列修改。 最后一个原型:lodash是在这种类型的系统中定义修改器的巨大帮助
那么,这在角度服务环境中看起来怎么样呢?
这是一个非常简单的实现,不是高度可重用的,但其他实现可能是:
const INIT_STATE = []; @Injectable() export class NumberArrayService { private numberArraySource = new BehaviorSubject(INIT_STATE); private numberArrayModifierSource = new Subject(); numberArray$ = this.numberArraySource.asObservable(); constructor() { this.numberArrayModifierSource.scan((fullArray, {modifier, payload?}) => modifier(fullArray, payload), INIT_STATE).subscribe(this.numberArraySource); } private publishNumberChange(modifier, payload?) { this.numberArrayModifierSource.next({modifier, payload}); } insertNumber(numberToInsert) { let modifier = (full, val) => full.concat([val]); this.publishNumberChange(modifier, numberToInsert); } removeNumber(numberToRemove) { let modifier = (full, val) => full.splice(full.indexOf(val),1); this.publishNumberChange(modifier, numberToRemove); } sort() { let modifier = (full, val) => full.sort(); this.publishNumberChange(modifier); } reset() { let modifier = (full, val) => INIT_STATE; this.publishNumberChange(modifier); } }
这里的用法很简单,订阅者只需订阅numberArray $并通过调用函数来修改数组。 您可以使用此简单模式来扩展您喜欢的功能。 这可以控制对数字数组的访问,并确保始终以api和您的状态定义的方式对其进行修改,并且您的主题始终是同一个。
好的,但这是如何制作通用/可重用的?
export interface Modifier<T> { modifier: (state: T, payload:any) => T; payload?: any; } export class StoreSubject<T> { private storeSource: BehaviorSubject<T>; private modifierSource: Subject<Modifier<T>>; store$: Observable<T>; publish(modifier: Modifier<T>): void { this.modifierSource.next(modifier); } constructor(init_state:T) { this.storeSource = new BehaviorSubject<T>(init_state); this.modifierSource = new Subject<Modifier<T>>(); this.modifierSource.scan((acc:T, modifier:Modifier<T>) => modifier.modifier(acc, modifier.payload), init_state).subscribe(this.storeSource); this.store$ = this.storeSource.asObservable(); } }
并且您的服务变为:
const INIT_STATE = []; @Injectable() export class NumberArrayService { private numberArraySource = new StoreSubject<number[]>(INIT_STATE); numberArray$ = this.numberArraySource.store$; constructor() { } insertNumber(numberToInsert: number) { let modifier = (full, val) => full.concat([val]); this.numberArraySource.publish({modifier, payload: numberToInsert}); } removeNumber(numberToRemove: number) { let modifier = (full, val) => full.splice(full.indexOf(val),1); this.numberArraySource.publish({modifier, payload: numberToRemove}); } sort() { let modifier = (full, val) => full.sort(); this.numberArraySource.publish({modifier}); } reset() { let modifier = (full, val) => INIT_STATE; this.numberArraySource.publish({modifier}); } }
It sounds like the operator you may be looking for is scan.
let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), [])
Scan accumulates values over time in an observable stream, and each item in the stream gets the last emitted value and the current value as parameters. executes a function on them and then emits the result. the above example takes a new value and appends it to your full array, the second parameter initializes it to an empty array.
This is clearly kind of restricting though since it only does ONE thing, which may not be robust enough. in this case you need to get clever:
let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
Now you're passing in an "action" which has a modifier function, which defines how you want to modify the full array, and a payload of any additional data the modifier might need to go into the modifier function along with the full array
so you might do:
let modifier = (full, item) => full.splice(full.indexOf(item), 1); arraySubject.next({modifier, payload: itemToRemove});
which removes the item you sent through. You can extend this pattern to literally any array modification.
A "gotcha" with scan though is that subscribers only get the accumulated value from the TIME THEY SUBSCRIBED. So, this will happen:
let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []); let subscriber1 = array$.subscribe(); //subscriber1 gets [] let modifier = (full, val) => full.concat([val]); arraySubject.next({modifier, payload:1}); //subscriber1 gets [1] arraySubject.next({modifier, payload:2}); //subscriber1 gets [1,2] let subscriber2 = array$.subscribe(); //subscriber2 gets [2] arraySubject.next({modifier, payload:3}); //subscriber1 gets [1,2,3] //subscriber2 gets [2,3]
See what happened there? the only thing stored in the behaviorsubject was the second event, not the full array, scan is storing the full array, so the second subscriber only gets the second action since it wasn't subscribed during the 1st action. So you need a persistent subscriber pattern:
let arraySubject = BehaviorSubject([]); let arrayModifierSubject = new Subject(); arrayModifierSubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []).subscribe(arraySubject);
and you modify by calling next on arrayModifierSubject:
let modifier = (full, val) => full.concat([val]); arrayModifierSubject.next({modifier, payload: 1});
and your subscribers get the array from the array source:
subscriber1 = arraySubject.subscribe();
In this set up, all array modifications go through the modifier subject who in turns broadcasts it to the behaviorsubject who stores the full array for future subscribers and broadcasts it to current subscribers. The behaviorsubject (the store subject) is persistently subscribed to the modifier subject (the action subject), and is the ONLY subscriber to the action subject, so the full array is never lost as the entire history of actions is always maintained.
some sample usages (with the above set up):
// insert 1 at end let modifier = (full, value) => full.concat([value]); arrayModifierSubject.next({modifier, payload: 1}); // insert 1 at start let modifier = (full, value) => [value].concat(full); arrayModifierSubject.next({modifier, payload: 1}); // remove 1 let modifier = (full, value) => full.splice(full.indexOf(value),1); arrayModifierSubject.next({modifier, payload: 1}); // change all instances of 1 to 2 let modifier = (full, value) => full.map(v => (v === value.target) ? value.newValue : v); arrayModifierSubject.next({modifier, payload: {target: 1, newValue: 2}});
you can wrap any of these functions in a "publishNumbersChange" function. How you exactly implement this depends on your needs, you can make functions like:
insertNumber(numberToInsert:number) => { let modifier = (full, val) => full.concat([val]); publishNumbersChange(modifier, numberToInsert); } publishNumbersChange(modifier, payload) => { arrayModifierSubject.next({modifier, payload}); }
or you can declare an interface and make classes and use that:
publishNumbersChange({modifier, payload}) => { arrayModifierSubject.next({modifier, payload}); } interface NumberArrayModifier { modifier: (full: number[], payload:any) => number[]; payload: any; } class InsertNumber implements NumberArrayModifier { modifier = (full: number[], payload: number): number[] => full.concat([payload]); payload: number; constructor(numberToInsert:number) { this.payload = numberToInsert; } } publishNumbersChange(new InsertNumber(1));
And you can also extend similar functionality to any array modification. One last protip: lodash is a huge help with defining your modifiers in this type of system
so, how might this look in an angular service context?
This is a very simple implementation that isn't highly reusable, but other implementations could be:
const INIT_STATE = []; @Injectable() export class NumberArrayService { private numberArraySource = new BehaviorSubject(INIT_STATE); private numberArrayModifierSource = new Subject(); numberArray$ = this.numberArraySource.asObservable(); constructor() { this.numberArrayModifierSource.scan((fullArray, {modifier, payload?}) => modifier(fullArray, payload), INIT_STATE).subscribe(this.numberArraySource); } private publishNumberChange(modifier, payload?) { this.numberArrayModifierSource.next({modifier, payload}); } insertNumber(numberToInsert) { let modifier = (full, val) => full.concat([val]); this.publishNumberChange(modifier, numberToInsert); } removeNumber(numberToRemove) { let modifier = (full, val) => full.splice(full.indexOf(val),1); this.publishNumberChange(modifier, numberToRemove); } sort() { let modifier = (full, val) => full.sort(); this.publishNumberChange(modifier); } reset() { let modifier = (full, val) => INIT_STATE; this.publishNumberChange(modifier); } }
Usage here is simple, subscribers just subscribe to numberArray$ and modify the array by calling functions. You use this simple pattern to extend functionality however you like. This controls access to your number array and makes sure it is always modified in ways defined by the api and your state and your subject are always one in the same.
OK but how is this made generic/reusable?
export interface Modifier<T> { modifier: (state: T, payload:any) => T; payload?: any; } export class StoreSubject<T> { private storeSource: BehaviorSubject<T>; private modifierSource: Subject<Modifier<T>>; store$: Observable<T>; publish(modifier: Modifier<T>): void { this.modifierSource.next(modifier); } constructor(init_state:T) { this.storeSource = new BehaviorSubject<T>(init_state); this.modifierSource = new Subject<Modifier<T>>(); this.modifierSource.scan((acc:T, modifier:Modifier<T>) => modifier.modifier(acc, modifier.payload), init_state).subscribe(this.storeSource); this.store$ = this.storeSource.asObservable(); } }
and your service becomes:
const INIT_STATE = []; @Injectable() export class NumberArrayService { private numberArraySource = new StoreSubject<number[]>(INIT_STATE); numberArray$ = this.numberArraySource.store$; constructor() { } insertNumber(numberToInsert: number) { let modifier = (full, val) => full.concat([val]); this.numberArraySource.publish({modifier, payload: numberToInsert}); } removeNumber(numberToRemove: number) { let modifier = (full, val) => full.splice(full.indexOf(val),1); this.numberArraySource.publish({modifier, payload: numberToRemove}); } sort() { let modifier = (full, val) => full.sort(); this.numberArraySource.publish({modifier}); } reset() { let modifier = (full, val) => INIT_STATE; this.numberArraySource.publish({modifier}); } }
相关问答
更多-
听起来您可能正在寻找的操作员是扫描。 let arraySubject = new BehaviorSubject([]); let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), []) 扫描在可观察流中随时间累积值,并且流中的每个项目获取最后发射的值和当前值作为参数。 对它们执行一个函数,然后发出结果。 上面的示例获取一个新值并将其附加到完整数组,第二个参数将其初始化为空数组。 这显然有 ...
-
我认为这样做你想要的: function asyncOperation(user) { return Rx.Observable .timer(3000 * Math.random()) .map(() => { user.isModified = true; return user; }).do((x) => { console.log('async', x); }); } const users = [ { id: 1, name: 'First ...
-
使用箭头功能。 它无法找到this.users因为this已在常规功能中发生了变化。 .map((i) =>this.users[i]); 要么 .map((i)=> { return this.users[i]; }); Use arrow function. It cannot find this.users as this has changed in the regular function. .map((i) =>this.users[i]); Or .map ...
-
它几乎和你已经做过的承诺一样。 convertToCurrency(fromCurrency: string, toCurrency: string): Observable
{ return new Observable (observer => { this.getQuotes() .subscribe(quotes => { let usdFrom = quotes.filte ... -
尝试这个: let seq = Observable.just(personArray) seq.just(personArray) .map{ $0.filter{ $0.name.hasPrefix("M")} } .subscribe(onNext: { item in print(item) }) 你也可以这样做: let seq = Observable.just(personArray.filter{ $0.name.hasPrefix( ...
-
正如上面的评论中所提到的,您需要订阅observable而不是尝试订阅observableArray(因为只有在添加/删除某些内容时才会收到通知)。 视图模型: function child(data) { var self = this; self.Value1 = ko.observable(data.Value1); self.Value2 = ko.observable(data.Value2); self.Status = ko.observable(data.St ...
-
RxJS / Observable flatMap可以返回Observable或数组(RxJS/Observable flatMap can return Observable or array)[2021-11-06]
官方文档不相关,因为它们指的是RxJS 4而不是RxJS 5。 mergeMap投影函数不仅返回Observable而且还返回ObservableInput接口 ,该接口适用于可以转换为observables的其他值 : 数组可以解释为observable,从左到右逐个发出数组中的所有值,然后立即完成。 这意味着 obs$.flatMap((data) => arr) 基本上是一个较短的版本 obs$.flatMap((data) => Observable.from(arr)) The officia ... -
修改Observable上的列表(Modify list on Observable)[2022-05-26]
使用flatMap的2参数版本: Observable.from(list) .flatMap(jsonObject -> buildObservable((String) jsonObject.get("url")), (jsonObject, result) -> combine(jsonObject, result) ) .subscribe(async::resume, async::resume); Use the 2-parameter version o ... -
无法循环观察(unable to loop observable)[2022-03-12]
尝试这个: let book of (store.select('books') | async)?.books 因为你的store.select('books')是一个async过程而不是store.select('books').books 。 我把? 之后要确保此声明的响应不是falsy值。 try this: let book of (store.select('books') | async)?.books because your store.select('books') is an as ... -
我想代替.map(items => this.fillAll(items))你可以只使用mergeMap() : .mergeMap(items => this.fillAll(items)) 这将订阅从fillAll()返回的Observable,并在它准备好时重新发送它的值。 顺便说一句,请注意,您没有在内部subject上调用complete() ,因此Observable链未正确处理(但在您的用例中可能不重要)。 I think instead of .map(items => this.fill ...