Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import { normalizeNFC } from 'vs/base/common/normalization';
import { realcaseSync } from 'vs/base/node/extfs';
import { isMacintosh } from 'vs/base/common/platform';
import * as watcherCommon from 'vs/workbench/services/files/node/watcher/common';
import { IWatcherRequest, IWatcherService, IWatcherOptions } from 'vs/workbench/services/files/node/watcher/unix/watcher';
import { IWatcherRequest, IWatcherService, IWatcherOptions, IWatchError } from 'vs/workbench/services/files/node/watcher/unix/watcher';
import { Emitter, Event } from 'vs/base/common/event';

interface IWatcher {
requests: ExtendedWatcherRequest[];
Expand All @@ -44,29 +45,20 @@ export class ChokidarWatcherService implements IWatcherService {
private _watchers: { [watchPath: string]: IWatcher };
private _watcherCount: number;

private _watcherPromise: TPromise<void>;
private _options: IWatcherOptions & IChockidarWatcherOptions;

private spamCheckStartTime: number;
private spamWarningLogged: boolean;
private enospcErrorLogged: boolean;
private _errorCallback: (error: Error) => void;
private _fileChangeCallback: (changes: watcherCommon.IRawFileChange[]) => void;

public initialize(options: IWatcherOptions & IChockidarWatcherOptions): TPromise<void> {
private _onWatchEvent = new Emitter<watcherCommon.IRawFileChange[] | IWatchError>();
readonly onWatchEvent = this._onWatchEvent.event;

watch(options: IWatcherOptions & IChockidarWatcherOptions): Event<watcherCommon.IRawFileChange[] | IWatchError> {
this._options = options;
this._watchers = Object.create(null);
this._watcherCount = 0;
this._watcherPromise = new TPromise<void>((c, e, p) => {
this._errorCallback = (error) => {
this.stop();
e(error);
};
this._fileChangeCallback = p;
}, () => {
this.stop();
});
return this._watcherPromise;
return this.onWatchEvent;
}

public setRoots(requests: IWatcherRequest[]): TPromise<void> {
Expand Down Expand Up @@ -233,7 +225,7 @@ export class ChokidarWatcherService implements IWatcherService {

// Broadcast to clients normalized
const res = watcherCommon.normalize(events);
this._fileChangeCallback(res);
this._onWatchEvent.fire(res);

// Logging
if (this._options.verboseLogging) {
Expand All @@ -257,7 +249,8 @@ export class ChokidarWatcherService implements IWatcherService {
if ((<any>error).code === 'ENOSPC') {
if (!this.enospcErrorLogged) {
this.enospcErrorLogged = true;
this._errorCallback(new Error('Inotify limit reached (ENOSPC)'));
this.stop();
this._onWatchEvent.fire({ message: 'Inotify limit reached (ENOSPC)' });
}
} else {
console.error(error.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,15 @@ suite.skip('Chockidar watching', () => {
await pfs.mkdirp(bFolder);
await pfs.mkdirp(b2Folder);

const promise = service.initialize({ verboseLogging: false, pollingInterval: 200 });
promise.then(null,
e => {
console.log('set error', e);
error = e;
},
p => {
if (Array.isArray(p)) {
result.push(...p);
}
const opts = { verboseLogging: false, pollingInterval: 200 };
service.watch(opts)(e => {
if (Array.isArray(e)) {
result.push(...e);
} else {
console.log('set error', e.message);
error = e.message;
}
);

});
});

suiteTeardown(async () => {
Expand Down
10 changes: 8 additions & 2 deletions src/vs/workbench/services/files/node/watcher/unix/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
'use strict';

import { TPromise } from 'vs/base/common/winjs.base';
import { Event } from 'vs/base/common/event';
import { IRawFileChange } from 'vs/workbench/services/files/node/watcher/common';

export interface IWatcherRequest {
basePath: string;
Expand All @@ -16,7 +18,11 @@ export interface IWatcherOptions {
verboseLogging: boolean;
}

export interface IWatchError {
message: string;
}

export interface IWatcherService {
initialize(options: IWatcherOptions): TPromise<void>;
watch(options: IWatcherOptions): Event<IRawFileChange[] | IWatchError>;
setRoots(roots: IWatcherRequest[]): TPromise<void>;
}
}
19 changes: 12 additions & 7 deletions src/vs/workbench/services/files/node/watcher/unix/watcherIpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,31 @@

import { TPromise } from 'vs/base/common/winjs.base';
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
import { IWatcherRequest, IWatcherService, IWatcherOptions } from 'vs/workbench/services/files/node/watcher/unix/watcher';
import { IWatcherRequest, IWatcherService, IWatcherOptions, IWatchError } from './watcher';
import { Event } from 'vs/base/common/event';
import { IRawFileChange } from 'vs/workbench/services/files/node/watcher/common';

export interface IWatcherChannel extends IChannel {
call(command: 'initialize', options: IWatcherOptions): TPromise<void>;
listen(event: 'watch', verboseLogging: boolean): Event<IRawFileChange[] | Error>;
listen<T>(event: string, arg?: any): Event<T>;

call(command: 'setRoots', request: IWatcherRequest[]): TPromise<void>;
call(command: string, arg: any): TPromise<any>;
call<T>(command: string, arg?: any): TPromise<T>;
}

export class WatcherChannel implements IWatcherChannel {

constructor(private service: IWatcherService) { }

listen<T>(event: string, arg?: any): Event<T> {
listen(event: string, arg?: any): Event<any> {
switch (event) {
case 'watch': return this.service.watch(arg);
}
throw new Error('No events');
}

call(command: string, arg: any): TPromise<any> {
switch (command) {
case 'initialize': return this.service.initialize(arg);
case 'setRoots': return this.service.setRoots(arg);
}
return undefined;
Expand All @@ -37,8 +42,8 @@ export class WatcherChannelClient implements IWatcherService {

constructor(private channel: IWatcherChannel) { }

initialize(options: IWatcherOptions): TPromise<void> {
return this.channel.call('initialize', options);
watch(options: IWatcherOptions): Event<IRawFileChange[] | IWatchError> {
return this.channel.listen('watch', options);
}

setRoots(roots: IWatcherRequest[]): TPromise<void> {
Expand Down
48 changes: 16 additions & 32 deletions src/vs/workbench/services/files/node/watcher/unix/watcherService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

'use strict';

import { TPromise } from 'vs/base/common/winjs.base';
import { getNextTickChannel } from 'vs/base/parts/ipc/common/ipc';
import { Client } from 'vs/base/parts/ipc/node/ipc.cp';
import uri from 'vs/base/common/uri';
import { toFileChangesEvent, IRawFileChange } from 'vs/workbench/services/files/node/watcher/common';
import { IWatcherChannel, WatcherChannelClient } from 'vs/workbench/services/files/node/watcher/unix/watcherIpc';
import { FileChangesEvent, IFilesConfiguration } from 'vs/platform/files/common/files';
import { IWorkspaceContextService } from 'vs/platform/workspace/common/workspace';
import { isPromiseCanceledError } from 'vs/base/common/errors';
import { IDisposable, dispose } from 'vs/base/common/lifecycle';
import { IConfigurationService } from 'vs/platform/configuration/common/configuration';
import { Schemas } from 'vs/base/common/network';
import { filterEvent } from 'vs/base/common/event';
import { IWatchError } from 'vs/workbench/services/files/node/watcher/unix/watcher';

export class FileWatcher {
private static readonly MAX_RESTARTS = 5;
Expand Down Expand Up @@ -55,20 +55,7 @@ export class FileWatcher {
);
this.toDispose.push(client);

const channel = getNextTickChannel(client.getChannel<IWatcherChannel>('watcher'));
this.service = new WatcherChannelClient(channel);

const options = {
verboseLogging: this.verboseLogging
};

this.service.initialize(options).then(null, err => {
if (!this.isDisposed && !isPromiseCanceledError(err)) {
return TPromise.wrapError(err); // the service lib uses the promise cancel error to indicate the process died, we do not want to bubble this up
}
return void 0;
}, (events: IRawFileChange[]) => this.onRawFileEvents(events)).done(() => {

client.onDidProcessExit(() => {
// our watcher app should never be completed because it keeps on watching. being in here indicates
// that the watcher process died and we want to restart it here. we only do it a max number of times
if (!this.isDisposed) {
Expand All @@ -80,11 +67,19 @@ export class FileWatcher {
this.errorLogger('[FileWatcher] failed to start after retrying for some time, giving up. Please report this as a bug report!');
}
}
}, error => {
if (!this.isDisposed) {
this.errorLogger(error);
}
});
}, null, this.toDispose);

const channel = getNextTickChannel(client.getChannel<IWatcherChannel>('watcher'));
this.service = new WatcherChannelClient(channel);

const options = { verboseLogging: this.verboseLogging };
const onWatchEvent = filterEvent(this.service.watch(options), () => !this.isDisposed);

const onError = filterEvent<any, IWatchError>(onWatchEvent, (e): e is IWatchError => typeof e.message === 'string');
onError(err => this.errorLogger(err.message), null, this.toDispose);

const onFileChanges = filterEvent<any, IRawFileChange[]>(onWatchEvent, (e): e is IRawFileChange[] => Array.isArray(e) && e.length > 0);
onFileChanges(e => this.onFileChanges(toFileChangesEvent(e)), null, this.toDispose);

// Start watching
this.updateFolders();
Expand Down Expand Up @@ -123,17 +118,6 @@ export class FileWatcher {
}));
}

private onRawFileEvents(events: IRawFileChange[]): void {
if (this.isDisposed) {
return;
}

// Emit through event emitter
if (events.length > 0) {
this.onFileChanges(toFileChangesEvent(events));
}
}

private dispose(): void {
this.isDisposed = true;
this.toDispose = dispose(this.toDispose);
Expand Down