首页 \ 问答 \ MapReduce结果的增量更新(Incremental update to MapReduce results)

MapReduce结果的增量更新(Incremental update to MapReduce results)

我有一个MapReduce作业,它填充HBase中的搜索索引。 此MapReduce作业每天在完整数据集上运行。 有没有办法我可以在上次计算索引后到达的新数据上运行MapReduce,然后在HBase中正确更新搜索索引?


I have a MapReduce job which populates the search index in HBase. This MapReduce job runs daily on the complete data set. Is there a way I can just run my MapReduce on new data which arrived after the last time indexes were computed and then properly update the search index in HBase?


原文:https://stackoverflow.com/questions/15639294
更新时间:2022-11-05 13:11

最满意答案

听起来您可能正在寻找的操作员是扫描。

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), [])

扫描在可观察流中随时间累积值,并且流中的每个项目获取最后发射的值和当前值作为参数。 对它们执行一个函数,然后发出结果。 上面的示例获取一个新值并将其附加到完整数组,第二个参数将其初始化为空数组。

这显然有点限制,因为它只做一件事,这可能不够健壮。 在这种情况下,你需要聪明:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);

现在你传入一个“动作”,它有一个修饰符函数,它定义了你想要修改完整数组的方式,以及修饰符可能需要与完整数组一起进入修饰符函数的任何附加数据的有效负载

所以你可能会这样做:

let modifier = (full, item) => full.splice(full.indexOf(item), 1);
arraySubject.next({modifier, payload: itemToRemove});

删除您发送的项目。 您可以将此模式扩展为字面上的任何数组修改。

扫描的“问题”是订阅者只能从他们所订购的时间获得累积值。 所以,这将发生:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
let subscriber1 = array$.subscribe();
//subscriber1 gets []
let modifier = (full, val) => full.concat([val]);
arraySubject.next({modifier, payload:1});
//subscriber1 gets [1]
arraySubject.next({modifier, payload:2});
//subscriber1 gets [1,2]
let subscriber2 = array$.subscribe();
//subscriber2 gets [2]
arraySubject.next({modifier, payload:3});
//subscriber1 gets [1,2,3]
//subscriber2 gets [2,3]

看看那里发生了什么? 在behaviorubject中存储的唯一内容是第二个事件,而不是完整数组,scan是存储完整数组,因此第二个订阅者只获得第二个操作,因为它在第一个操作期间没有订阅。 所以你需要一个持久的订阅者模式:

let arraySubject = BehaviorSubject([]);
let arrayModifierSubject = new Subject();
arrayModifierSubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []).subscribe(arraySubject);

并通过在arrayModifierSubject上调用next来修改:

let modifier = (full, val) => full.concat([val]);
arrayModifierSubject.next({modifier, payload: 1});

并且您的订阅者从数组源获取数组:

subscriber1 = arraySubject.subscribe();

在此设置中,所有数组修改都通过修饰符主题,修改者主体将其广播到行为主体,该行为主体为将来的订阅者存储完整数组并将其广播给当前订阅者。 behaviorubject(商店主题)持久地订阅修改主题(动作主题),并且是动作主题的唯一订阅者,因此完整数组永远不会丢失,因为始终保持整个动作历史记录。

一些示例用法(使用上面的设置):

// insert 1 at end
let modifier = (full, value) => full.concat([value]);
arrayModifierSubject.next({modifier, payload: 1});

// insert 1 at start
let modifier = (full, value) => [value].concat(full);
arrayModifierSubject.next({modifier, payload: 1});

// remove 1
let modifier = (full, value) => full.splice(full.indexOf(value),1);
arrayModifierSubject.next({modifier, payload: 1});

// change all instances of 1 to 2
let modifier = (full, value) => full.map(v => (v === value.target) ? value.newValue : v);
arrayModifierSubject.next({modifier, payload: {target: 1, newValue: 2}});

您可以在“publishNumbersChange”函数中包装任何这些函数。 你如何完全实现这取决于你的需求,你可以做如下功能:

insertNumber(numberToInsert:number) => {
   let modifier = (full, val) => full.concat([val]);
   publishNumbersChange(modifier, numberToInsert);
}

publishNumbersChange(modifier, payload) => {
   arrayModifierSubject.next({modifier, payload});
}

或者您可以声明一个接口并创建类并使用它:

publishNumbersChange({modifier, payload}) => {
   arrayModifierSubject.next({modifier, payload});
}

interface NumberArrayModifier {
    modifier: (full: number[], payload:any) => number[];
    payload: any;
}

class InsertNumber implements NumberArrayModifier {
    modifier = (full: number[], payload: number): number[] => full.concat([payload]);
    payload: number;
    constructor(numberToInsert:number) {
        this.payload = numberToInsert;
    }
}

publishNumbersChange(new InsertNumber(1));

您还可以将类似功能扩展到任何阵列修改。 最后一个原型:lodash是在这种类型的系统中定义修改器的巨大帮助

那么,这在角度服务环境中看起来怎么样呢?

这是一个非常简单的实现,不是高度可重用的,但其他实现可能是:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new BehaviorSubject(INIT_STATE);
    private numberArrayModifierSource = new Subject();
    numberArray$ = this.numberArraySource.asObservable();

    constructor() {
        this.numberArrayModifierSource.scan((fullArray, {modifier, payload?}) => modifier(fullArray, payload), INIT_STATE).subscribe(this.numberArraySource);
    }

    private publishNumberChange(modifier, payload?) {
        this.numberArrayModifierSource.next({modifier, payload});
    }

    insertNumber(numberToInsert) {
        let modifier = (full, val) => full.concat([val]);
        this.publishNumberChange(modifier, numberToInsert);
    }

    removeNumber(numberToRemove) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.publishNumberChange(modifier, numberToRemove);
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.publishNumberChange(modifier);
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.publishNumberChange(modifier);
    }
}

这里的用法很简单,订阅者只需订阅numberArray $并通过调用函数来修改数组。 您可以使用此简单模式来扩展您喜欢的功能。 这可以控制对数字数组的访问,并确保始终以api和您的状态定义的方式对其进行修改,并且您的主题始终是同一个。

好的,但这是如何制作通用/可重用的?

export interface Modifier<T> {
    modifier: (state: T, payload:any) => T;
    payload?: any;
}

export class StoreSubject<T> {
    private storeSource: BehaviorSubject<T>;
    private modifierSource: Subject<Modifier<T>>;
    store$: Observable<T>;

    publish(modifier: Modifier<T>): void {
        this.modifierSource.next(modifier);
    }

    constructor(init_state:T) {
        this.storeSource = new BehaviorSubject<T>(init_state);
        this.modifierSource = new Subject<Modifier<T>>();
        this.modifierSource.scan((acc:T, modifier:Modifier<T>) => modifier.modifier(acc, modifier.payload), init_state).subscribe(this.storeSource);
        this.store$ = this.storeSource.asObservable();
    }
}

并且您的服务变为:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new StoreSubject<number[]>(INIT_STATE);
    numberArray$ = this.numberArraySource.store$;

    constructor() {
    }

    insertNumber(numberToInsert: number) {
        let modifier = (full, val) => full.concat([val]);
        this.numberArraySource.publish({modifier, payload: numberToInsert});
    }

    removeNumber(numberToRemove: number) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.numberArraySource.publish({modifier, payload: numberToRemove});
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.numberArraySource.publish({modifier});
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.numberArraySource.publish({modifier});
    }
}

It sounds like the operator you may be looking for is scan.

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), [])

Scan accumulates values over time in an observable stream, and each item in the stream gets the last emitted value and the current value as parameters. executes a function on them and then emits the result. the above example takes a new value and appends it to your full array, the second parameter initializes it to an empty array.

This is clearly kind of restricting though since it only does ONE thing, which may not be robust enough. in this case you need to get clever:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);

Now you're passing in an "action" which has a modifier function, which defines how you want to modify the full array, and a payload of any additional data the modifier might need to go into the modifier function along with the full array

so you might do:

let modifier = (full, item) => full.splice(full.indexOf(item), 1);
arraySubject.next({modifier, payload: itemToRemove});

which removes the item you sent through. You can extend this pattern to literally any array modification.

A "gotcha" with scan though is that subscribers only get the accumulated value from the TIME THEY SUBSCRIBED. So, this will happen:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
let subscriber1 = array$.subscribe();
//subscriber1 gets []
let modifier = (full, val) => full.concat([val]);
arraySubject.next({modifier, payload:1});
//subscriber1 gets [1]
arraySubject.next({modifier, payload:2});
//subscriber1 gets [1,2]
let subscriber2 = array$.subscribe();
//subscriber2 gets [2]
arraySubject.next({modifier, payload:3});
//subscriber1 gets [1,2,3]
//subscriber2 gets [2,3]

See what happened there? the only thing stored in the behaviorsubject was the second event, not the full array, scan is storing the full array, so the second subscriber only gets the second action since it wasn't subscribed during the 1st action. So you need a persistent subscriber pattern:

let arraySubject = BehaviorSubject([]);
let arrayModifierSubject = new Subject();
arrayModifierSubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []).subscribe(arraySubject);

and you modify by calling next on arrayModifierSubject:

let modifier = (full, val) => full.concat([val]);
arrayModifierSubject.next({modifier, payload: 1});

and your subscribers get the array from the array source:

subscriber1 = arraySubject.subscribe();

In this set up, all array modifications go through the modifier subject who in turns broadcasts it to the behaviorsubject who stores the full array for future subscribers and broadcasts it to current subscribers. The behaviorsubject (the store subject) is persistently subscribed to the modifier subject (the action subject), and is the ONLY subscriber to the action subject, so the full array is never lost as the entire history of actions is always maintained.

some sample usages (with the above set up):

// insert 1 at end
let modifier = (full, value) => full.concat([value]);
arrayModifierSubject.next({modifier, payload: 1});

// insert 1 at start
let modifier = (full, value) => [value].concat(full);
arrayModifierSubject.next({modifier, payload: 1});

// remove 1
let modifier = (full, value) => full.splice(full.indexOf(value),1);
arrayModifierSubject.next({modifier, payload: 1});

// change all instances of 1 to 2
let modifier = (full, value) => full.map(v => (v === value.target) ? value.newValue : v);
arrayModifierSubject.next({modifier, payload: {target: 1, newValue: 2}});

you can wrap any of these functions in a "publishNumbersChange" function. How you exactly implement this depends on your needs, you can make functions like:

insertNumber(numberToInsert:number) => {
   let modifier = (full, val) => full.concat([val]);
   publishNumbersChange(modifier, numberToInsert);
}

publishNumbersChange(modifier, payload) => {
   arrayModifierSubject.next({modifier, payload});
}

or you can declare an interface and make classes and use that:

publishNumbersChange({modifier, payload}) => {
   arrayModifierSubject.next({modifier, payload});
}

interface NumberArrayModifier {
    modifier: (full: number[], payload:any) => number[];
    payload: any;
}

class InsertNumber implements NumberArrayModifier {
    modifier = (full: number[], payload: number): number[] => full.concat([payload]);
    payload: number;
    constructor(numberToInsert:number) {
        this.payload = numberToInsert;
    }
}

publishNumbersChange(new InsertNumber(1));

And you can also extend similar functionality to any array modification. One last protip: lodash is a huge help with defining your modifiers in this type of system

so, how might this look in an angular service context?

This is a very simple implementation that isn't highly reusable, but other implementations could be:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new BehaviorSubject(INIT_STATE);
    private numberArrayModifierSource = new Subject();
    numberArray$ = this.numberArraySource.asObservable();

    constructor() {
        this.numberArrayModifierSource.scan((fullArray, {modifier, payload?}) => modifier(fullArray, payload), INIT_STATE).subscribe(this.numberArraySource);
    }

    private publishNumberChange(modifier, payload?) {
        this.numberArrayModifierSource.next({modifier, payload});
    }

    insertNumber(numberToInsert) {
        let modifier = (full, val) => full.concat([val]);
        this.publishNumberChange(modifier, numberToInsert);
    }

    removeNumber(numberToRemove) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.publishNumberChange(modifier, numberToRemove);
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.publishNumberChange(modifier);
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.publishNumberChange(modifier);
    }
}

Usage here is simple, subscribers just subscribe to numberArray$ and modify the array by calling functions. You use this simple pattern to extend functionality however you like. This controls access to your number array and makes sure it is always modified in ways defined by the api and your state and your subject are always one in the same.

OK but how is this made generic/reusable?

export interface Modifier<T> {
    modifier: (state: T, payload:any) => T;
    payload?: any;
}

export class StoreSubject<T> {
    private storeSource: BehaviorSubject<T>;
    private modifierSource: Subject<Modifier<T>>;
    store$: Observable<T>;

    publish(modifier: Modifier<T>): void {
        this.modifierSource.next(modifier);
    }

    constructor(init_state:T) {
        this.storeSource = new BehaviorSubject<T>(init_state);
        this.modifierSource = new Subject<Modifier<T>>();
        this.modifierSource.scan((acc:T, modifier:Modifier<T>) => modifier.modifier(acc, modifier.payload), init_state).subscribe(this.storeSource);
        this.store$ = this.storeSource.asObservable();
    }
}

and your service becomes:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new StoreSubject<number[]>(INIT_STATE);
    numberArray$ = this.numberArraySource.store$;

    constructor() {
    }

    insertNumber(numberToInsert: number) {
        let modifier = (full, val) => full.concat([val]);
        this.numberArraySource.publish({modifier, payload: numberToInsert});
    }

    removeNumber(numberToRemove: number) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.numberArraySource.publish({modifier, payload: numberToRemove});
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.numberArraySource.publish({modifier});
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.numberArraySource.publish({modifier});
    }
}

相关问答

更多

相关文章

更多

最新问答

更多
  • 您如何使用git diff文件,并将其应用于同一存储库的副本的本地分支?(How do you take a git diff file, and apply it to a local branch that is a copy of the same repository?)
  • 将长浮点值剪切为2个小数点并复制到字符数组(Cut Long Float Value to 2 decimal points and copy to Character Array)
  • OctoberCMS侧边栏不呈现(OctoberCMS Sidebar not rendering)
  • 页面加载后对象是否有资格进行垃圾回收?(Are objects eligible for garbage collection after the page loads?)
  • codeigniter中的语言不能按预期工作(language in codeigniter doesn' t work as expected)
  • 在计算机拍照在哪里进入
  • 使用cin.get()从c ++中的输入流中丢弃不需要的字符(Using cin.get() to discard unwanted characters from the input stream in c++)
  • No for循环将在for循环中运行。(No for loop will run inside for loop. Testing for primes)
  • 单页应用程序:页面重新加载(Single Page Application: page reload)
  • 在循环中选择具有相似模式的列名称(Selecting Column Name With Similar Pattern in a Loop)
  • System.StackOverflow错误(System.StackOverflow error)
  • KnockoutJS未在嵌套模板上应用beforeRemove和afterAdd(KnockoutJS not applying beforeRemove and afterAdd on nested templates)
  • 散列包括方法和/或嵌套属性(Hash include methods and/or nested attributes)
  • android - 如何避免使用Samsung RFS文件系统延迟/冻结?(android - how to avoid lag/freezes with Samsung RFS filesystem?)
  • TensorFlow:基于索引列表创建新张量(TensorFlow: Create a new tensor based on list of indices)
  • 企业安全培训的各项内容
  • 错误:RPC失败;(error: RPC failed; curl transfer closed with outstanding read data remaining)
  • C#类名中允许哪些字符?(What characters are allowed in C# class name?)
  • NumPy:将int64值存储在np.array中并使用dtype float64并将其转换回整数是否安全?(NumPy: Is it safe to store an int64 value in an np.array with dtype float64 and later convert it back to integer?)
  • 注销后如何隐藏导航portlet?(How to hide navigation portlet after logout?)
  • 将多个行和可变行移动到列(moving multiple and variable rows to columns)
  • 提交表单时忽略基础href,而不使用Javascript(ignore base href when submitting form, without using Javascript)
  • 对setOnInfoWindowClickListener的意图(Intent on setOnInfoWindowClickListener)
  • Angular $资源不会改变方法(Angular $resource doesn't change method)
  • 在Angular 5中不是一个函数(is not a function in Angular 5)
  • 如何配置Composite C1以将.m和桌面作为同一站点提供服务(How to configure Composite C1 to serve .m and desktop as the same site)
  • 不适用:悬停在悬停时:在元素之前[复制](Don't apply :hover when hovering on :before element [duplicate])
  • 常见的python rpc和cli接口(Common python rpc and cli interface)
  • Mysql DB单个字段匹配多个其他字段(Mysql DB single field matching to multiple other fields)
  • 产品页面上的Magento Up出售对齐问题(Magento Up sell alignment issue on the products page)