diff --git a/pom.xml b/pom.xml index 288ece8deb294848ff19e5c58c747600f37e4c7a..bacac753799a9f49ac0dff4d9fd4494269bd730c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ <groupId>au.edu.unimelb.mf</groupId> <artifactId>unimelb-mf-clients</artifactId> - <version>0.4.4</version> + <version>0.4.5</version> <packaging>jar</packaging> <name>unimelb-mf-clients</name> <url>https://gitlab.unimelb.edu.au/resplat-mediaflux/unimelb-mf-clients</url> diff --git a/src/main/java/unimelb/mf/client/sync/MFSyncApp.java b/src/main/java/unimelb/mf/client/sync/MFSyncApp.java index 4f59e058b5b7f147fa1f03ca960c8454563e2b35..6e9bd5bf5411c82ceeba0a3e6a177a2aa3bca062 100644 --- a/src/main/java/unimelb/mf/client/sync/MFSyncApp.java +++ b/src/main/java/unimelb/mf/client/sync/MFSyncApp.java @@ -36,6 +36,8 @@ import arc.xml.XmlStringWriter; import unimelb.mf.client.App; import unimelb.mf.client.file.PosixAttributes; import unimelb.mf.client.sync.check.AssetItem; +import unimelb.mf.client.sync.check.CheckHandler; +import unimelb.mf.client.sync.check.CheckResult; import unimelb.mf.client.sync.check.ChecksumType; import unimelb.mf.client.sync.settings.Job; import unimelb.mf.client.sync.settings.downstream.AssetNamespaceDownloadCheckJob; @@ -67,870 +69,881 @@ import unimelb.utils.TimeUtils; public abstract class MFSyncApp extends AbstractMFApp<unimelb.mf.client.sync.settings.Settings> implements Runnable { - private unimelb.mf.client.sync.settings.Settings _settings; - private ThreadPoolExecutor _workers; - private ThreadPoolExecutor _queriers; - private Timer _daemonTimer; - private Thread _daemonListener; - private ServerSocket _daemonListenerSocket; - - private AtomicLong _nbFiles = new AtomicLong(0); - private AtomicLong _nbUploadedFiles = new AtomicLong(0); - private AtomicLong _nbUploadedZeroSizeFiles = new AtomicLong(0); - private AtomicLong _nbSkippedFiles = new AtomicLong(0); - private AtomicLong _nbFailedFiles = new AtomicLong(0); - private AtomicLong _nbUploadedBytes = new AtomicLong(0); - private AtomicInteger _nbDeletedFiles = new AtomicInteger(0); - - private AtomicLong _nbAssets = new AtomicLong(0); - private AtomicLong _nbDownloadedAssets = new AtomicLong(0); - private AtomicLong _nbDownloadedZeroSizeAssets = new AtomicLong(0); - private AtomicLong _nbSkippedAssets = new AtomicLong(0); - private AtomicLong _nbFailedAssets = new AtomicLong(0); - private AtomicLong _nbDownloadedBytes = new AtomicLong(0); - private AtomicInteger _nbDeletedAssets = new AtomicInteger(0); - - private DataTransferListener<Path, String> _ul; - private DataTransferListener<String, Path> _dl; - - private Long _stimeLast = null; - private Long _stime = null; - - private Long _execStartTime = null; - - protected MFSyncApp() { - super(); - _settings = new unimelb.mf.client.sync.settings.Settings(); - _ul = new DataTransferListener<Path, String>() { - - @Override - public void transferStarted(Path src, String dst) { - } - - @Override - public void transferFailed(Path src, String dst) { - _nbFailedFiles.getAndIncrement(); - // TODO record failed files. - } - - @Override - public void transferCompleted(Path src, String dst) { - _nbUploadedFiles.getAndIncrement(); - try { - if (Files.size(src) == 0) { - _nbUploadedZeroSizeFiles.getAndIncrement(); - } - } catch (IOException e) { - MFSyncApp.this.logger().log(Level.SEVERE, e.getMessage(), e); - } - } - - @Override - public void transferSkipped(Path src, String dst) { - _nbSkippedFiles.getAndIncrement(); - } - - @Override - public void transferProgressed(Path src, String dst, long increment) { - _nbUploadedBytes.getAndAdd(increment); - } - }; - _dl = new DataTransferListener<String, Path>() { - - @Override - public void transferStarted(String src, Path dst) { - } - - @Override - public void transferFailed(String src, Path dst) { - _nbFailedAssets.getAndIncrement(); - // TODO record failed files. - } - - @Override - public void transferCompleted(String src, Path dst) { - _nbDownloadedAssets.getAndIncrement(); - try { - if (Files.size(dst) == 0) { - _nbDownloadedZeroSizeAssets.getAndIncrement(); - } - } catch (IOException e) { - MFSyncApp.this.logger().log(Level.SEVERE, e.getMessage(), e); - } - } - - @Override - public void transferSkipped(String src, Path dst) { - _nbSkippedAssets.getAndIncrement(); - } - - @Override - public void transferProgressed(String src, Path dst, long increment) { - _nbDownloadedBytes.getAndAdd(increment); - } - }; - } - - @Override - public String description() { - return "The Mediaflux client application to upload, download or check data."; - } - - protected static Logger createDefaultLogger(unimelb.mf.client.sync.settings.Settings settings, String appName) - throws Throwable { - Logger logger = settings.logDirectory() == null ? LoggingUtils.createConsoleLogger() - : LoggingUtils.createFileAndConsoleLogger(settings.logDirectory(), appName, - settings.logFileSizeMB() * 1000000, settings.logFileCount()); - logger.setLevel(settings.verbose() ? Level.ALL : Level.WARNING); - return logger; - } - - protected void preExecute() throws Throwable { - - } - - @Override - public void run() { - try { - execute(); - } catch (Throwable e) { - if (e instanceof InterruptedException) { - logger().log(Level.WARNING, e.getMessage()); - Thread.currentThread().interrupt(); - } else { - logger().log(Level.SEVERE, e.getMessage(), e); - } - } - } - - @Override - public void execute() throws Throwable { - - preExecute(); - - // take down the current system time. - _execStartTime = System.currentTimeMillis(); - - if (!settings().hasJobs()) { - throw new Exception("No job found!"); - } - if (settings().hasOnlyCheckJobs()) { - // Check jobs do not need to run into daemon mode. - settings().setDaemon(false); - } - - _queriers = new ThreadPoolExecutor(settings().numberOfQueriers(), settings().numberOfQueriers(), 0, - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, applicationName() + ".querier"); - } - }, new ThreadPoolExecutor.CallerRunsPolicy()); - - _workers = new ThreadPoolExecutor(settings().numberOfWorkers(), settings().numberOfWorkers(), 0, - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, applicationName() + ".worker"); - } - }, new ThreadPoolExecutor.CallerRunsPolicy()); - - try { - - session().startPingServerPeriodically(60000); - - // starts the daemon regardless whether is recurring execution or - // not. As the listener socket is useful for the one-off execution - // during the execution. Note: the daemon timer will not started for - // one-off execution. - startDaemon(); - - submitJobs(); - - if (!settings().daemon() && (settings().needToDeleteAssets() || settings().needToDeleteFiles())) { - // waiting until the threadpools are clear. This is - // required for sync jobs. - while (_queriers.getActiveCount() > 0 || !_queriers.getQueue().isEmpty() - || _workers.getActiveCount() > 0 || !_workers.getQueue().isEmpty()) { - Thread.sleep(1000); - } - if (settings().deleteAssets()) { - syncDeleteAssets(); - } - if (settings().deleteFiles()) { - syncDeleteFiles(); - } - } - } catch (Throwable e) { - logger().log(Level.SEVERE, e.getMessage(), e); - } finally { - if (!settings().daemon()) { - // shutdown the thread pools and wait until they complete - shutdown(true); - // stop the daemon listener (socket). The daemon timer was not - // started. - stopDaemon(); - // post execution procedure to handle the results etc. - postExecute(); - } - } - } - - public void startDaemon() throws Throwable { - if (settings().daemon()) { - // Recurring execution requires a timer. - if (_daemonTimer == null) { - _daemonTimer = new Timer(); - _daemonTimer.schedule(new TimerTask() { - - @Override - public void run() { - if (!settings().hasJobs() || settings().hasOnlyCheckJobs()) { - logger().info("No transfer jobs found! Stopping..."); - interrupt(); - } - if (_queriers.getActiveCount() == 0 && _queriers.getQueue().isEmpty() - && _workers.getActiveCount() == 0 && _workers.getQueue().isEmpty()) { - try { - if (settings().deleteAssets()) { - syncDeleteAssets(); - } - if (settings().deleteFiles()) { - syncDeleteFiles(); - } - // waiting until the thread pools are clear. - // This is required for sync jobs, and the - // situation, where there are both download and - // upload jobs - while (_queriers.getActiveCount() > 0 || !_queriers.getQueue().isEmpty() - || _workers.getActiveCount() > 0 || !_workers.getQueue().isEmpty()) { - // wait until threadpools are clear. - Thread.sleep(1000); - } - submitJobs(); - } catch (Throwable e) { - logger().log(Level.SEVERE, e.getMessage(), e); - } - } - } - }, settings().daemonScanInterval(), settings().daemonScanInterval()); - } - } - if (_daemonListener == null) { - // Both recurring execution and one-off execution can benefit from - // the listener. - _daemonListener = new Thread(new Runnable() { - - @Override - public void run() { - try { - try { - _daemonListenerSocket = new ServerSocket(_settings.daemonListenerPort(), 0, - InetAddress.getByName(null)); - } catch (BindException be) { - _daemonListenerSocket = new ServerSocket(0, 0, InetAddress.getByName(null)); - String msg = "Local port: " + _settings.daemonListenerPort() - + " is already in use. Use available port: " + _daemonListenerSocket.getLocalPort() - + " instead."; - logger().warning(msg); - } - if (settings().daemon()) { - logger().info("Listening to local port: " + _daemonListenerSocket.getLocalPort() - + ". You can run command 'echo status | nc localhost " - + _daemonListenerSocket.getLocalPort() + "' to check the progress."); - } - try { - outerloop: while (!Thread.interrupted() && !_daemonListenerSocket.isClosed()) { - Socket client = _daemonListenerSocket.accept(); - try { - BufferedReader in = new BufferedReader( - new InputStreamReader(client.getInputStream())); - while (!Thread.interrupted()) { - String cmd = in.readLine(); - if ("stop".equalsIgnoreCase(cmd)) { - interrupt(); - break outerloop; - } else if ("status".equalsIgnoreCase(cmd)) { - printSummary(new PrintStream(client.getOutputStream(), true)); - break; - } else { - break; - } - } - } finally { - client.close(); - } - } - } catch (SocketException se) { - if (settings().verbose()) { - logger().info("Listening socket closed!"); - } - } finally { - _daemonListenerSocket.close(); - } - } catch (Throwable e) { - logger().log(Level.SEVERE, e.getMessage(), e); - } - } - - }, applicationName().toLowerCase() + ".daemon.listener"); - _daemonListener.start(); - } - } - - protected String generateSummary() { - - StringBuilder sb = new StringBuilder(); - long durationMillis = System.currentTimeMillis() - _execStartTime; - List<Job> jobs = settings().jobs(); - if (jobs != null && !jobs.isEmpty()) { - sb.append("\n"); - if (settings().hasDownloadJobs()) { - sb.append("Download:\n"); - for (Job job : jobs) { - if (job instanceof AssetNamespaceDownloadJob) { - AssetNamespaceDownloadJob dj = (AssetNamespaceDownloadJob) job; - sb.append(" src(mediaflux): " + dj.srcAssetNamespace()).append("\n"); - sb.append(" dst(directory): " + dj.dstDirectory().toString()).append("\n"); - } else if (job instanceof AssetSetDownloadJob) { - AssetSetDownloadJob dcj = (AssetSetDownloadJob) job; - sb.append(" src(mediaflux): " + dcj.srcAssetPaths().size() + " assets").append("\n"); - sb.append(" dst(directory): " + dcj.dstDirectory().toString()).append("\n"); - } else if (job instanceof AssetNamespaceDownloadCheckJob) { - AssetNamespaceDownloadCheckJob dj = (AssetNamespaceDownloadCheckJob) job; - sb.append(" src(mediaflux): " + dj.srcAssetNamespace()).append("\n"); - sb.append(" dst(directory): " + dj.dstDirectory().toString()).append("\n"); - } - } - } - if (settings().hasUploadJobs()) { - sb.append("Upload:\n"); - for (Job job : jobs) { - if (job instanceof DirectoryUploadJob) { - DirectoryUploadJob uj = (DirectoryUploadJob) job; - sb.append(" src(directory): " + uj.srcDirectory().toString()).append("\n"); - sb.append(" dst(mediaflux): " + uj.dstAssetNamespace()).append("\n"); - } else if (job instanceof FileSetUploadJob) { - FileSetUploadJob uj = (FileSetUploadJob) job; - sb.append(" src(files): " + uj.srcFiles().size() + " files").append("\n"); - sb.append(" dst(mediaflux): " + uj.dstAssetNamespace()).append("\n"); - } else if (job instanceof DirectoryUploadCheckJob) { - DirectoryUploadCheckJob ucj = (DirectoryUploadCheckJob) job; - sb.append(" src(directory): " + ucj.srcDirectory().toString()).append("\n"); - sb.append(" dst(mediaflux): " + ucj.dstAssetNamespace()).append("\n"); - } else if (job instanceof FileSetUploadCheckJob) { - FileSetUploadCheckJob ucj = (FileSetUploadCheckJob) job; - sb.append(" src(files): " + ucj.srcFiles().size() + " files").append("\n"); - sb.append(" dst(mediaflux): " + ucj.dstAssetNamespace()).append("\n"); - } - } - } - sb.append("\n"); - } - sb.append("\n"); - sb.append("Summary:\n"); - if (_settings.daemon()) { - sb.append(String.format(" Up time: %s", TimeUtils.humanReadableDuration(durationMillis))) - .append("\n"); - } else { - sb.append(String.format(" Exec time: %s", TimeUtils.humanReadableDuration(durationMillis))) - .append("\n"); - } - sb.append(String.format(" Worker threads: %d", _settings.numberOfWorkers())).append("\n"); - - sb.append("\n"); - - long totalFiles = _nbFiles.get(); - long totalProcessedFiles = _nbUploadedFiles.get() + _nbSkippedFiles.get() + _nbFailedFiles.get(); - if (totalFiles > 0 || totalProcessedFiles > 0) { - if (_nbUploadedZeroSizeFiles.get() > 0) { - sb.append(String.format(" Uploaded files: %,32d files(%d zero-size)", _nbUploadedFiles.get(), - _nbUploadedZeroSizeFiles.get())).append("\n"); - } else { - sb.append(String.format(" Uploaded files: %,32d files", _nbUploadedFiles.get())).append("\n"); - } - sb.append(String.format(" Skipped files: %,32d files", _nbSkippedFiles.get())).append("\n"); - sb.append(String.format(" Failed files: %,32d files", _nbFailedFiles.get())).append("\n"); - if (!_settings.daemon()) { - sb.append(String.format(" Total files: %,32d files", totalFiles)).append("\n"); - } - sb.append(String.format(" Uploaded bytes: %,32d bytes", _nbUploadedBytes.get())).append("\n"); - if (!_settings.daemon()) { - // @formatter:off + private unimelb.mf.client.sync.settings.Settings _settings; + private ThreadPoolExecutor _workers; + private ThreadPoolExecutor _queriers; + private Timer _daemonTimer; + private Thread _daemonListener; + private ServerSocket _daemonListenerSocket; + + private AtomicLong _nbFiles = new AtomicLong(0); + private AtomicLong _nbUploadedFiles = new AtomicLong(0); + private AtomicLong _nbUploadedZeroSizeFiles = new AtomicLong(0); + private AtomicLong _nbSkippedFiles = new AtomicLong(0); + private AtomicLong _nbFailedFiles = new AtomicLong(0); + private AtomicLong _nbUploadedBytes = new AtomicLong(0); + private AtomicInteger _nbDeletedFiles = new AtomicInteger(0); + + private AtomicLong _nbAssets = new AtomicLong(0); + private AtomicLong _nbDownloadedAssets = new AtomicLong(0); + private AtomicLong _nbDownloadedZeroSizeAssets = new AtomicLong(0); + private AtomicLong _nbSkippedAssets = new AtomicLong(0); + private AtomicLong _nbFailedAssets = new AtomicLong(0); + private AtomicLong _nbDownloadedBytes = new AtomicLong(0); + private AtomicInteger _nbDeletedAssets = new AtomicInteger(0); + + private DataTransferListener<Path, String> _ul; + private DataTransferListener<String, Path> _dl; + + private Long _stimeLast = null; + private Long _stime = null; + + private Long _execStartTime = null; + + protected MFSyncApp() { + super(); + _settings = new unimelb.mf.client.sync.settings.Settings(); + _ul = new DataTransferListener<Path, String>() { + + @Override + public void transferStarted(Path src, String dst) { + } + + @Override + public void transferFailed(Path src, String dst) { + _nbFailedFiles.getAndIncrement(); + // TODO record failed files. + } + + @Override + public void transferCompleted(Path src, String dst) { + _nbUploadedFiles.getAndIncrement(); + try { + if (Files.size(src) == 0) { + _nbUploadedZeroSizeFiles.getAndIncrement(); + } + } catch (IOException e) { + MFSyncApp.this.logger().log(Level.SEVERE, e.getMessage(), e); + } + } + + @Override + public void transferSkipped(Path src, String dst) { + _nbSkippedFiles.getAndIncrement(); + } + + @Override + public void transferProgressed(Path src, String dst, long increment) { + _nbUploadedBytes.getAndAdd(increment); + } + }; + _dl = new DataTransferListener<String, Path>() { + + @Override + public void transferStarted(String src, Path dst) { + } + + @Override + public void transferFailed(String src, Path dst) { + _nbFailedAssets.getAndIncrement(); + // TODO record failed files. + } + + @Override + public void transferCompleted(String src, Path dst) { + _nbDownloadedAssets.getAndIncrement(); + try { + if (Files.size(dst) == 0) { + _nbDownloadedZeroSizeAssets.getAndIncrement(); + } + } catch (IOException e) { + MFSyncApp.this.logger().log(Level.SEVERE, e.getMessage(), e); + } + } + + @Override + public void transferSkipped(String src, Path dst) { + _nbSkippedAssets.getAndIncrement(); + } + + @Override + public void transferProgressed(String src, Path dst, long increment) { + _nbDownloadedBytes.getAndAdd(increment); + } + }; + } + + @Override + public String description() { + return "The Mediaflux client application to upload, download or check data."; + } + + protected static Logger createDefaultLogger(unimelb.mf.client.sync.settings.Settings settings, String appName) + throws Throwable { + Logger logger = settings.logDirectory() == null ? LoggingUtils.createConsoleLogger() + : LoggingUtils.createFileAndConsoleLogger(settings.logDirectory(), appName, + settings.logFileSizeMB() * 1000000, settings.logFileCount()); + logger.setLevel(settings.verbose() ? Level.ALL : Level.WARNING); + return logger; + } + + protected void preExecute() throws Throwable { + + } + + @Override + public void run() { + try { + execute(); + } catch (Throwable e) { + if (e instanceof InterruptedException) { + logger().log(Level.WARNING, e.getMessage()); + Thread.currentThread().interrupt(); + } else { + logger().log(Level.SEVERE, e.getMessage(), e); + } + } + } + + @Override + public void execute() throws Throwable { + + preExecute(); + + // take down the current system time. + _execStartTime = System.currentTimeMillis(); + + if (!settings().hasJobs()) { + throw new Exception("No job found!"); + } + if (settings().hasOnlyCheckJobs()) { + // Check jobs do not need to run into daemon mode. + settings().setDaemon(false); + } + + _queriers = new ThreadPoolExecutor(settings().numberOfQueriers(), settings().numberOfQueriers(), 0, + TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, applicationName() + ".querier"); + } + }, new ThreadPoolExecutor.CallerRunsPolicy()); + + _workers = new ThreadPoolExecutor(settings().numberOfWorkers(), settings().numberOfWorkers(), 0, + TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, applicationName() + ".worker"); + } + }, new ThreadPoolExecutor.CallerRunsPolicy()); + + try { + + session().startPingServerPeriodically(60000); + + // starts the daemon regardless whether is recurring execution or + // not. As the listener socket is useful for the one-off execution + // during the execution. Note: the daemon timer will not started for + // one-off execution. + startDaemon(); + + submitJobs(); + + if (!settings().daemon() && (settings().needToDeleteAssets() || settings().needToDeleteFiles())) { + // waiting until the threadpools are clear. This is + // required for sync jobs. + while (_queriers.getActiveCount() > 0 || !_queriers.getQueue().isEmpty() + || _workers.getActiveCount() > 0 || !_workers.getQueue().isEmpty()) { + Thread.sleep(1000); + } + if (settings().deleteAssets()) { + syncDeleteAssets(); + } + if (settings().deleteFiles()) { + syncDeleteFiles(); + } + } + } catch (Throwable e) { + logger().log(Level.SEVERE, e.getMessage(), e); + } finally { + if (!settings().daemon()) { + // shutdown the thread pools and wait until they complete + shutdown(true); + // stop the daemon listener (socket). The daemon timer was not + // started. + stopDaemon(); + // post execution procedure to handle the results etc. + postExecute(); + } + } + } + + public void startDaemon() throws Throwable { + if (settings().daemon()) { + // Recurring execution requires a timer. + if (_daemonTimer == null) { + _daemonTimer = new Timer(); + _daemonTimer.schedule(new TimerTask() { + + @Override + public void run() { + if (!settings().hasJobs() || settings().hasOnlyCheckJobs()) { + logger().info("No transfer jobs found! Stopping..."); + interrupt(); + } + if (_queriers.getActiveCount() == 0 && _queriers.getQueue().isEmpty() + && _workers.getActiveCount() == 0 && _workers.getQueue().isEmpty()) { + try { + if (settings().deleteAssets()) { + syncDeleteAssets(); + } + if (settings().deleteFiles()) { + syncDeleteFiles(); + } + // waiting until the thread pools are clear. + // This is required for sync jobs, and the + // situation, where there are both download and + // upload jobs + while (_queriers.getActiveCount() > 0 || !_queriers.getQueue().isEmpty() + || _workers.getActiveCount() > 0 || !_workers.getQueue().isEmpty()) { + // wait until threadpools are clear. + Thread.sleep(1000); + } + submitJobs(); + } catch (Throwable e) { + logger().log(Level.SEVERE, e.getMessage(), e); + } + } + } + }, settings().daemonScanInterval(), settings().daemonScanInterval()); + } + } + if (_daemonListener == null) { + // Both recurring execution and one-off execution can benefit from + // the listener. + _daemonListener = new Thread(new Runnable() { + + @Override + public void run() { + try { + try { + _daemonListenerSocket = new ServerSocket(_settings.daemonListenerPort(), 0, + InetAddress.getByName(null)); + } catch (BindException be) { + _daemonListenerSocket = new ServerSocket(0, 0, InetAddress.getByName(null)); + String msg = "Local port: " + _settings.daemonListenerPort() + + " is already in use. Use available port: " + _daemonListenerSocket.getLocalPort() + + " instead."; + logger().warning(msg); + } + if (settings().daemon()) { + logger().info("Listening to local port: " + _daemonListenerSocket.getLocalPort() + + ". You can run command 'echo status | nc localhost " + + _daemonListenerSocket.getLocalPort() + "' to check the progress."); + } + try { + outerloop: while (!Thread.interrupted() && !_daemonListenerSocket.isClosed()) { + Socket client = _daemonListenerSocket.accept(); + try { + BufferedReader in = new BufferedReader( + new InputStreamReader(client.getInputStream())); + while (!Thread.interrupted()) { + String cmd = in.readLine(); + if ("stop".equalsIgnoreCase(cmd)) { + interrupt(); + break outerloop; + } else if ("status".equalsIgnoreCase(cmd)) { + printSummary(new PrintStream(client.getOutputStream(), true)); + break; + } else { + break; + } + } + } finally { + client.close(); + } + } + } catch (SocketException se) { + if (settings().verbose()) { + logger().info("Listening socket closed!"); + } + } finally { + _daemonListenerSocket.close(); + } + } catch (Throwable e) { + logger().log(Level.SEVERE, e.getMessage(), e); + } + } + + }, applicationName().toLowerCase() + ".daemon.listener"); + _daemonListener.start(); + } + } + + protected String generateSummary() { + + StringBuilder sb = new StringBuilder(); + long durationMillis = System.currentTimeMillis() - _execStartTime; + List<Job> jobs = settings().jobs(); + if (jobs != null && !jobs.isEmpty()) { + sb.append("\n"); + if (settings().hasDownloadJobs()) { + sb.append("Download:\n"); + for (Job job : jobs) { + if (job instanceof AssetNamespaceDownloadJob) { + AssetNamespaceDownloadJob dj = (AssetNamespaceDownloadJob) job; + sb.append(" src(mediaflux): " + dj.srcAssetNamespace()).append("\n"); + sb.append(" dst(directory): " + dj.dstDirectory().toString()).append("\n"); + } else if (job instanceof AssetSetDownloadJob) { + AssetSetDownloadJob dcj = (AssetSetDownloadJob) job; + sb.append(" src(mediaflux): " + dcj.srcAssetPaths().size() + " assets").append("\n"); + sb.append(" dst(directory): " + dcj.dstDirectory().toString()).append("\n"); + } else if (job instanceof AssetNamespaceDownloadCheckJob) { + AssetNamespaceDownloadCheckJob dj = (AssetNamespaceDownloadCheckJob) job; + sb.append(" src(mediaflux): " + dj.srcAssetNamespace()).append("\n"); + sb.append(" dst(directory): " + dj.dstDirectory().toString()).append("\n"); + } + } + } + if (settings().hasUploadJobs()) { + sb.append("Upload:\n"); + for (Job job : jobs) { + if (job instanceof DirectoryUploadJob) { + DirectoryUploadJob uj = (DirectoryUploadJob) job; + sb.append(" src(directory): " + uj.srcDirectory().toString()).append("\n"); + sb.append(" dst(mediaflux): " + uj.dstAssetNamespace()).append("\n"); + } else if (job instanceof FileSetUploadJob) { + FileSetUploadJob uj = (FileSetUploadJob) job; + sb.append(" src(files): " + uj.srcFiles().size() + " files").append("\n"); + sb.append(" dst(mediaflux): " + uj.dstAssetNamespace()).append("\n"); + } else if (job instanceof DirectoryUploadCheckJob) { + DirectoryUploadCheckJob ucj = (DirectoryUploadCheckJob) job; + sb.append(" src(directory): " + ucj.srcDirectory().toString()).append("\n"); + sb.append(" dst(mediaflux): " + ucj.dstAssetNamespace()).append("\n"); + } else if (job instanceof FileSetUploadCheckJob) { + FileSetUploadCheckJob ucj = (FileSetUploadCheckJob) job; + sb.append(" src(files): " + ucj.srcFiles().size() + " files").append("\n"); + sb.append(" dst(mediaflux): " + ucj.dstAssetNamespace()).append("\n"); + } + } + } + sb.append("\n"); + } + sb.append("\n"); + sb.append("Summary:\n"); + if (_settings.daemon()) { + sb.append(String.format(" Up time: %s", TimeUtils.humanReadableDuration(durationMillis))) + .append("\n"); + } else { + sb.append(String.format(" Exec time: %s", TimeUtils.humanReadableDuration(durationMillis))) + .append("\n"); + } + sb.append(String.format(" Worker threads: %d", _settings.numberOfWorkers())).append("\n"); + + sb.append("\n"); + + long totalFiles = _nbFiles.get(); + long totalProcessedFiles = _nbUploadedFiles.get() + _nbSkippedFiles.get() + _nbFailedFiles.get(); + if (totalFiles > 0 || totalProcessedFiles > 0) { + if (_nbUploadedZeroSizeFiles.get() > 0) { + sb.append(String.format(" Uploaded files: %,32d files(%d zero-size)", _nbUploadedFiles.get(), + _nbUploadedZeroSizeFiles.get())).append("\n"); + } else { + sb.append(String.format(" Uploaded files: %,32d files", _nbUploadedFiles.get())).append("\n"); + } + sb.append(String.format(" Skipped files: %,32d files", _nbSkippedFiles.get())).append("\n"); + sb.append(String.format(" Failed files: %,32d files", _nbFailedFiles.get())).append("\n"); + if (!_settings.daemon()) { + sb.append(String.format(" Total files: %,32d files", totalFiles)).append("\n"); + } + sb.append(String.format(" Uploaded bytes: %,32d bytes", _nbUploadedBytes.get())).append("\n"); + if (!_settings.daemon()) { + // @formatter:off sb.append(String.format(" Upload speed: %,32.3f MB/s", (double) _nbUploadedBytes.get() / 1000.0 / ((double) durationMillis))).append("\n"); // @formatter:on - } - sb.append("\n"); - if (!_settings.daemon()) { - if (totalFiles != totalProcessedFiles) { - sb.append("\nWARNING: processed " + totalProcessedFiles + " files out of total of " + totalFiles - + ". Please check for errors in the log file.\n"); - } - } - } - int deletedAssets = _nbDeletedAssets.get(); - if (deletedAssets > 0) { - sb.append(String.format(" Deleted Assets: %,32d assets", _nbDeletedAssets.get())).append("\n"); - sb.append("\n"); - } - - long totalAssets = _nbAssets.get(); - long totalProcessedAssets = _nbDownloadedAssets.get() + _nbSkippedAssets.get() + _nbFailedAssets.get(); - if (totalAssets > 0 || totalProcessedAssets > 0) { - if (_nbDownloadedZeroSizeAssets.get() > 0) { - sb.append(String.format(" Downloaded assets: %,32d files(%d zero-size)", _nbDownloadedAssets.get(), - _nbDownloadedZeroSizeAssets.get())).append("\n"); - } else { - sb.append(String.format(" Downloaded assets: %,32d files", _nbDownloadedAssets.get())).append("\n"); - } - sb.append(String.format(" Skipped assets: %,32d files", _nbSkippedAssets.get())).append("\n"); - sb.append(String.format(" Failed assets: %,32d files", _nbFailedAssets.get())).append("\n"); - if (!_settings.daemon()) { - sb.append(String.format(" Total assets: %,32d files", totalAssets)).append("\n"); - } - sb.append(String.format(" Downloaded bytes: %,32d bytes", _nbDownloadedBytes.get())).append("\n"); - if (!_settings.daemon()) { - // @formatter:off + } + sb.append("\n"); + if (!_settings.daemon()) { + if (totalFiles != totalProcessedFiles) { + sb.append("\nWARNING: processed " + totalProcessedFiles + " files out of total of " + totalFiles + + ". Please check for errors in the log file.\n"); + } + } + } + int deletedAssets = _nbDeletedAssets.get(); + if (deletedAssets > 0) { + sb.append(String.format(" Deleted Assets: %,32d assets", _nbDeletedAssets.get())).append("\n"); + sb.append("\n"); + } + + long totalAssets = _nbAssets.get(); + long totalProcessedAssets = _nbDownloadedAssets.get() + _nbSkippedAssets.get() + _nbFailedAssets.get(); + if (totalAssets > 0 || totalProcessedAssets > 0) { + if (_nbDownloadedZeroSizeAssets.get() > 0) { + sb.append(String.format(" Downloaded assets: %,32d files(%d zero-size)", _nbDownloadedAssets.get(), + _nbDownloadedZeroSizeAssets.get())).append("\n"); + } else { + sb.append(String.format(" Downloaded assets: %,32d files", _nbDownloadedAssets.get())).append("\n"); + } + sb.append(String.format(" Skipped assets: %,32d files", _nbSkippedAssets.get())).append("\n"); + sb.append(String.format(" Failed assets: %,32d files", _nbFailedAssets.get())).append("\n"); + if (!_settings.daemon()) { + sb.append(String.format(" Total assets: %,32d files", totalAssets)).append("\n"); + } + sb.append(String.format(" Downloaded bytes: %,32d bytes", _nbDownloadedBytes.get())).append("\n"); + if (!_settings.daemon()) { + // @formatter:off sb.append(String.format(" Download speed: %,32.3f MB/s", (double) _nbDownloadedBytes.get() / 1000.0 / ((double) durationMillis))).append("\n"); // @formatter:on - } - sb.append("\n"); - if (!_settings.daemon()) { - if (totalAssets != totalProcessedAssets) { - sb.append("\nWARNING: processed " + totalProcessedAssets + " assets out of total of " + totalAssets - + ". Please check for errors in the log file.\n"); - } - } - } - int deletedFiles = _nbDeletedFiles.get(); - if (deletedFiles > 0) { - sb.append(String.format(" Deleted files: %,32d files", _nbDeletedFiles.get())).append("\n"); - sb.append("\n"); - } - return sb.toString(); - } - - protected void printSummary(PrintStream ps) { - ps.print(generateSummary()); - } - - public void stopDaemon() { - if (_daemonTimer != null) { - _daemonTimer.cancel(); - _daemonTimer = null; - } - if (_daemonListener != null) { - try { - if (_daemonListenerSocket != null) { - _daemonListenerSocket.close(); - } - } catch (IOException e) { - logger().log(Level.SEVERE, e.getMessage(), e); - } finally { - _daemonListener.interrupt(); - } - } - } - - private void submitJobs() throws Throwable { - if (_stime != null) { - _stimeLast = _stime; - } - _stime = session().execute("server.clock.time").longValue("stime", null); - List<Job> jobs = settings().jobs(); - if (jobs != null) { - for (Job job : jobs) { - switch (job.action()) { - case DOWNLOAD: - submitDownloadJob((DownloadJob) job); - break; - case UPLOAD: - submitUploadJob((UploadJob) job); - break; - case CHECK_DOWNLOAD: - submitDownloadCheckJob((DownloadCheckJob) job); - break; - case CHECK_UPLOAD: - submitUploadCheckJob((UploadCheckJob) job); - break; - default: - break; - } - } - } - } - - private void submitDownloadCheckJob(DownloadCheckJob job) throws Throwable { - if (job instanceof AssetNamespaceDownloadCheckJob) { - AssetNamespaceDownloadCheckJob dcj = (AssetNamespaceDownloadCheckJob) job; - int idx = 1; - long remaining = Long.MAX_VALUE; - long total = -1; - while (remaining > 0 && !Thread.interrupted()) { - - long from = idx; - long to = idx - 1 + settings().batchSize(); - logger().info("checking " + settings().batchSize() + " assets (" + from + "~" + to - + (total >= 0 ? ("/" + total) : "") + ")..."); - - XmlStringWriter w = new XmlStringWriter(); - w.add("where", "namespace>='" + dcj.srcAssetNamespace() + "' and asset has content"); - w.add("action", "get-value"); - w.add("count", true); - w.add("size", settings().batchSize()); - w.add("idx", idx); - w.add("xpath", new String[] { "ename", "path" }, - "string.format('%s/%s', xvalue('namespace'), choose(equals(xvalue('name'),null()), string.format('__asset_id__%s',xvalue('@id')),xvalue('name')))"); - w.add("xpath", new String[] { "ename", "csize" }, "content/size"); - w.add("xpath", new String[] { "ename", "csum" }, "content/csum"); - w.add("xpath", new String[] { "ename", "mtime" }, "meta/" + PosixAttributes.DOC_TYPE + "/mtime"); - XmlDoc.Element re = session().execute("asset.query", w.document()); - List<XmlDoc.Element> aes = re.elements("asset"); - if (aes != null && !aes.isEmpty()) { - List<AssetItem> ais = new ArrayList<AssetItem>(aes.size()); - for (XmlDoc.Element ae : aes) { - String assetPath = ae.value("path"); - long assetContentSize = ae.longValue("csize", -1); - String assetCsum = ae.value("csum"); - AssetItem ai = new AssetItem(assetPath, dcj.srcAssetNamespace(), assetContentSize, assetCsum, - ChecksumType.CRC32); - ais.add(ai); - } - _queriers.submit(new AssetSetCheckTask(session(), logger(), ais, dcj, settings().csumCheck(), - settings().checkHandler(), _workers)); - } - if (idx == 1) { - total = re.longValue("cursor/total"); - _nbAssets.getAndAdd(total); - settings().checkHandler().addTotal(total); - } - remaining = re.longValue("cursor/remaining"); - idx += settings().batchSize(); - } - } else { - throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); - } - } - - private void submitUploadCheckJob(UploadCheckJob job) throws Throwable { - if (job instanceof DirectoryUploadCheckJob) { - DirectoryUploadCheckJob ucj = (DirectoryUploadCheckJob) job; - List<Path> files = new ArrayList<Path>(settings().batchSize()); - Files.walkFileTree(ucj.srcDirectory(), - settings().followDirLinks() ? EnumSet.of(FileVisitOption.FOLLOW_LINKS) - : EnumSet.noneOf(FileVisitOption.class), - Integer.MAX_VALUE, new SimpleFileVisitor<Path>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - try { - if (!Files.isRegularFile(file)) { - logger().warning("Skipped '" + file - + "'. Not a regular file. Could be a symbolic link to directory."); - return FileVisitResult.CONTINUE; - } - if (ucj.accept(file)) { - files.add(file); - _nbFiles.getAndIncrement(); - settings().checkHandler().addTotal(1); - if (files.size() >= settings().batchSize()) { - // check files - _queriers.submit(new FileSetCheckTask(session(), logger(), - new ArrayList<Path>(files), ucj, settings().csumCheck(), - settings().checkHandler(), _workers)); - files.clear(); - } - } - } catch (Throwable e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - return FileVisitResult.TERMINATE; - } - logger().log(Level.SEVERE, e.getMessage(), e); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult visitFileFailed(Path file, IOException ioe) { - logger().log(Level.SEVERE, "Failed to access file: " + file, ioe); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException ioe) { - if (ioe != null) { - logger().log(Level.SEVERE, ioe.getMessage(), ioe); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) - throws IOException { - return super.preVisitDirectory(dir, attrs); - } - }); - if (!files.isEmpty()) { - // check files - _queriers.submit(new FileSetCheckTask(session(), logger(), new ArrayList<Path>(files), ucj, - settings().csumCheck(), settings().checkHandler(), _workers)); - files.clear(); - } - } else if (job instanceof FileSetUploadCheckJob) { - FileSetUploadCheckJob ucj = (FileSetUploadCheckJob) job; - - int nfs = ucj.srcFiles() == null ? 0 : ucj.srcFiles().size(); - _nbFiles.getAndAdd(nfs); - settings().checkHandler().addTotal(nfs); - - _queriers.submit(new FileSetCheckTask(session(), logger(), new ArrayList<Path>(ucj.srcFiles()), ucj, - settings().csumCheck(), settings().checkHandler(), _workers)); - } else { - throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); - } - } - - private void submitDownloadJob(DownloadJob job) throws Throwable { - if (job instanceof AssetNamespaceDownloadJob) { - - // add total assets - AssetNamespaceDownloadJob nsj = (AssetNamespaceDownloadJob) job; - logger().info("counting the number of assets to download..."); - long nba = AssetNamespaceUtils.countAssetsWithContent(session(), nsj.srcAssetNamespace()); - _nbAssets.getAndAdd(nba); - - // submit - _queriers.submit(new AssetNamespaceDownloadTask(session(), logger(), nsj, _stimeLast, - settings().batchSize(), settings().overwrite(), settings().unarchive(), _workers, _dl)); - } else if (job instanceof AssetSetDownloadJob) { - - // add total assets - AssetSetDownloadJob asj = (AssetSetDownloadJob) job; - Collection<String> srcAssetPaths = asj.srcAssetPaths(); - if (srcAssetPaths != null) { - _nbAssets.getAndAdd(srcAssetPaths.size()); - } - - _queriers.submit(new AssetSetDownloadTask(session(), logger(), (AssetSetDownloadJob) job, - settings().batchSize(), settings().overwrite(), settings().unarchive(), _workers, _dl)); - } else { - throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); - } - } - - private void submitUploadJob(UploadJob job) throws Throwable { - - if (job instanceof DirectoryUploadJob) { - DirectoryUploadJob duj = (DirectoryUploadJob) job; - - // Find content store of the dst namespace, to be used by - // Input.setStore(). This is required to enable cluster I/O - duj.resolveDstAssetNamespaceStore(session()); - - List<Path> files = new ArrayList<Path>(settings().batchSize()); - Files.walkFileTree(duj.srcDirectory(), - settings().followDirLinks() ? EnumSet.of(FileVisitOption.FOLLOW_LINKS) - : EnumSet.noneOf(FileVisitOption.class), - Integer.MAX_VALUE, new SimpleFileVisitor<Path>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - try { - if (!Files.isRegularFile(file)) { - logger().warning("Skipped '" + file - + "'. Not a regular file. Could be a symbolic link to directory."); - return FileVisitResult.CONTINUE; - } - if (duj.accept(file)) { - files.add(file); - _nbFiles.getAndIncrement(); - if (files.size() >= settings().batchSize()) { - _queriers.submit(new FileSetUploadTask(session(), logger(), - new ArrayList<Path>(files), duj, settings().csumCheck(), - settings().retry(), _ul, _workers)); - files.clear(); - } - } - } catch (Throwable e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - return FileVisitResult.TERMINATE; - } - logger().log(Level.SEVERE, e.getMessage(), e); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult visitFileFailed(Path file, IOException ioe) { - logger().log(Level.SEVERE, "Failed to access file: " + file, ioe); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException ioe) { - if (ioe != null) { - logger().log(Level.SEVERE, ioe.getMessage(), ioe); - } - try { - if (FileUtils.isEmptyDirectory(dir)) { - _workers.submit( - new AssetNamespaceCreateTask(session(), logger(), duj.transformPath(dir))); - } - } catch (IOException e) { - logger().log(Level.SEVERE, ioe.getMessage(), e); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) - throws IOException { - return super.preVisitDirectory(dir, attrs); - } - }); - if (!files.isEmpty()) { - _queriers.submit(new FileSetUploadTask(session(), logger(), new ArrayList<Path>(files), duj, - settings().csumCheck(), settings().retry(), _ul, _workers)); - files.clear(); - } - } else if (job instanceof FileSetUploadJob) { - FileSetUploadJob uj = (FileSetUploadJob) job; - List<Path> srcFiles = uj.srcFiles() == null ? null : new ArrayList<Path>(uj.srcFiles()); - if (srcFiles != null) { - _nbFiles.getAndAdd(srcFiles.size()); - int batchSize = settings().batchSize(); - int nbBatches = srcFiles.size() / batchSize; - int remainder = srcFiles.size() % batchSize; - if (remainder != 0) { - nbBatches++; - } - - for (int i = 0; i < nbBatches; i++) { - int from = i * batchSize; - int to = i * batchSize + ((remainder == 0 || i < nbBatches - 1) ? batchSize : remainder); - List<Path> files = srcFiles.subList(from, to); - _queriers.submit(new FileSetUploadTask(session(), logger(), new ArrayList<Path>(files), uj, - settings().csumCheck(), settings().retry(), _ul, _workers)); - - } - } - - } else { - throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); - } - } - - private void syncDeleteAssets() throws Throwable { - List<Job> jobs = settings().jobs(); - for (Job job : jobs) { - if (job instanceof DirectoryUploadJob) { - _queriers.submit(new SyncDeleteAssetsTask(session(), logger(), (DirectoryUploadJob) job, settings(), - _nbDeletedAssets)); - } - } - } - - private void syncDeleteFiles() throws Throwable { - List<Job> jobs = settings().jobs(); - for (Job job : jobs) { - if (job instanceof AssetNamespaceDownloadJob) { - _queriers.submit(new SyncDeleteFilesTask(session(), logger(), (AssetNamespaceDownloadJob) job, - settings(), _workers, _nbDeletedFiles)); - } - } - } - - public void interrupt() { - if (_queriers != null && !_queriers.isShutdown()) { - _queriers.shutdownNow(); - } - if (_workers != null && !_workers.isShutdown()) { - _workers.shutdownNow(); - } - stopDaemon(); - - // NOTE: We must discard the session because connection pooling is - // enabled. - session().discard(); - } - - public void shutdown(boolean wait) { - try { - if (_queriers != null) { - if (!_queriers.isShutdown()) { - // now all jobs have been submitted - _queriers.shutdown(); - } - if (wait) { - // wait until all tasks are submitted by queriers. - _queriers.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - } - } - - if (_workers != null) { - if (!_workers.isShutdown()) { - // now all tasks have been submitted - _workers.shutdown(); - } - if (wait) { - // wait until all tasks are processed by workers. - _workers.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - } - } - - // NOTE: We must discard the session because connection pooling is - // enabled. - session().discard(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger().info("Interrupted '" + Thread.currentThread().getName() + "' thread(id=" - + Thread.currentThread().getId() + ")."); - } catch (Throwable e) { - logger().log(Level.SEVERE, e.getMessage(), e); - } - } - - protected void postExecute() { - if (!settings().hasOnlyCheckJobs()) { - String summary = generateSummary(); - if (settings().logDirectory() != null) { - logger().info(summary); - } - if (settings().logDirectory() == null || !settings().verbose()) { - System.out.print(summary); - } - - if (settings().hasRecipients()) { - notifySummary(summary); - } - } - } - - @Override - public unimelb.mf.client.sync.settings.Settings settings() { - return _settings; - } - - public void notifySummary(String summary) { - try { - String subject = applicationName() + " results [" + new Date() + "]"; - notify(subject, summary); - } catch (Throwable e) { - logger().log(Level.SEVERE, e.getMessage(), e); - } - } - - public void notify(String subject, String message) throws Throwable { - if (settings().hasRecipients()) { - MailUtils.sendMail(session(), settings().recipients(), subject, message, true); - } - } - - public void printVersion() { - System.out.println( - String.format("%s %s (build-time: %s)", this.applicationName(), App.version(), App.buildTime())); - } + } + sb.append("\n"); + if (!_settings.daemon()) { + if (totalAssets != totalProcessedAssets) { + sb.append("\nWARNING: processed " + totalProcessedAssets + " assets out of total of " + totalAssets + + ". Please check for errors in the log file.\n"); + } + } + } + int deletedFiles = _nbDeletedFiles.get(); + if (deletedFiles > 0) { + sb.append(String.format(" Deleted files: %,32d files", _nbDeletedFiles.get())).append("\n"); + sb.append("\n"); + } + return sb.toString(); + } + + protected void printSummary(PrintStream ps) { + ps.print(generateSummary()); + } + + public void stopDaemon() { + if (_daemonTimer != null) { + _daemonTimer.cancel(); + _daemonTimer = null; + } + if (_daemonListener != null) { + try { + if (_daemonListenerSocket != null) { + _daemonListenerSocket.close(); + } + } catch (IOException e) { + logger().log(Level.SEVERE, e.getMessage(), e); + } finally { + _daemonListener.interrupt(); + } + } + } + + private void submitJobs() throws Throwable { + if (_stime != null) { + _stimeLast = _stime; + } + _stime = session().execute("server.clock.time").longValue("stime", null); + List<Job> jobs = settings().jobs(); + if (jobs != null) { + for (Job job : jobs) { + switch (job.action()) { + case DOWNLOAD: + submitDownloadJob((DownloadJob) job); + break; + case UPLOAD: + submitUploadJob((UploadJob) job); + break; + case CHECK_DOWNLOAD: + submitDownloadCheckJob((DownloadCheckJob) job); + break; + case CHECK_UPLOAD: + submitUploadCheckJob((UploadCheckJob) job); + break; + default: + break; + } + } + } + } + + private void submitDownloadCheckJob(DownloadCheckJob job) throws Throwable { + if (job instanceof AssetNamespaceDownloadCheckJob) { + AssetNamespaceDownloadCheckJob dcj = (AssetNamespaceDownloadCheckJob) job; + int idx = 1; + long remaining = Long.MAX_VALUE; + long total = -1; + while (remaining > 0 && !Thread.interrupted()) { + + long from = idx; + long to = idx - 1 + settings().batchSize(); + logger().info("checking " + settings().batchSize() + " assets (" + from + "~" + to + + (total >= 0 ? ("/" + total) : "") + ")..."); + + XmlStringWriter w = new XmlStringWriter(); + w.add("where", "namespace>='" + dcj.srcAssetNamespace() + "' and asset has content"); + w.add("action", "get-value"); + w.add("count", true); + w.add("size", settings().batchSize()); + w.add("idx", idx); + w.add("xpath", new String[] { "ename", "path" }, + "string.format('%s/%s', xvalue('namespace'), choose(equals(xvalue('name'),null()), string.format('__asset_id__%s',xvalue('@id')),xvalue('name')))"); + w.add("xpath", new String[] { "ename", "csize" }, "content/size"); + w.add("xpath", new String[] { "ename", "csum" }, "content/csum"); + w.add("xpath", new String[] { "ename", "mtime" }, "meta/" + PosixAttributes.DOC_TYPE + "/mtime"); + XmlDoc.Element re = session().execute("asset.query", w.document()); + List<XmlDoc.Element> aes = re.elements("asset"); + if (aes != null && !aes.isEmpty()) { + List<AssetItem> ais = new ArrayList<AssetItem>(aes.size()); + for (XmlDoc.Element ae : aes) { + String assetPath = ae.value("path"); + long assetContentSize = ae.longValue("csize", -1); + String assetCsum = ae.value("csum"); + AssetItem ai = new AssetItem(assetPath, dcj.srcAssetNamespace(), assetContentSize, assetCsum, + ChecksumType.CRC32); + ais.add(ai); + } + _queriers.submit(new AssetSetCheckTask(session(), logger(), ais, dcj, settings().csumCheck(), + settings().checkHandler(), _workers)); + } + if (idx == 1) { + total = re.longValue("cursor/total"); + _nbAssets.getAndAdd(total); + settings().checkHandler().addTotal(total); + } + remaining = re.longValue("cursor/remaining"); + idx += settings().batchSize(); + } + } else { + throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); + } + } + + private void submitUploadCheckJob(UploadCheckJob job) throws Throwable { + if (job instanceof DirectoryUploadCheckJob) { + DirectoryUploadCheckJob ucj = (DirectoryUploadCheckJob) job; + List<Path> files = new ArrayList<Path>(settings().batchSize()); + Files.walkFileTree(ucj.srcDirectory(), + settings().followDirLinks() ? EnumSet.of(FileVisitOption.FOLLOW_LINKS) + : EnumSet.noneOf(FileVisitOption.class), + Integer.MAX_VALUE, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + try { + if (!Files.isRegularFile(file)) { + logger().warning("Skipped '" + file + + "'. Not a regular file. Could be a symbolic link to directory."); + return FileVisitResult.CONTINUE; + } + if (ucj.accept(file)) { + files.add(file); + _nbFiles.getAndIncrement(); + settings().checkHandler().addTotal(1); + if (files.size() >= settings().batchSize()) { + // check files + _queriers.submit(new FileSetCheckTask(session(), logger(), + new ArrayList<Path>(files), ucj, settings().csumCheck(), + settings().checkHandler(), _workers)); + files.clear(); + } + } + } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return FileVisitResult.TERMINATE; + } + logger().log(Level.SEVERE, e.getMessage(), e); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException ioe) { + logger().log(Level.SEVERE, "Failed to access file: " + file, ioe); + if (ucj.accept(file)) { + String assetPath = ucj.transformPath(file); + CheckHandler ch = settings().checkHandler(); + ch.checked(new CheckResult(file, assetPath, false, false, false, + settings().csumCheck() ? false : null)); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException ioe) { + if (ioe != null) { + logger().log(Level.SEVERE, "[postVisitDirectory] " + dir + ": " + ioe.getMessage(), + ioe); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + return super.preVisitDirectory(dir, attrs); + } + }); + if (!files.isEmpty()) { + // check files + _queriers.submit(new FileSetCheckTask(session(), logger(), new ArrayList<Path>(files), ucj, + settings().csumCheck(), settings().checkHandler(), _workers)); + files.clear(); + } + } else if (job instanceof FileSetUploadCheckJob) { + FileSetUploadCheckJob ucj = (FileSetUploadCheckJob) job; + + int nfs = ucj.srcFiles() == null ? 0 : ucj.srcFiles().size(); + _nbFiles.getAndAdd(nfs); + settings().checkHandler().addTotal(nfs); + + _queriers.submit(new FileSetCheckTask(session(), logger(), new ArrayList<Path>(ucj.srcFiles()), ucj, + settings().csumCheck(), settings().checkHandler(), _workers)); + } else { + throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); + } + } + + private void submitDownloadJob(DownloadJob job) throws Throwable { + if (job instanceof AssetNamespaceDownloadJob) { + + // add total assets + AssetNamespaceDownloadJob nsj = (AssetNamespaceDownloadJob) job; + logger().info("counting the number of assets to download..."); + long nba = AssetNamespaceUtils.countAssetsWithContent(session(), nsj.srcAssetNamespace()); + _nbAssets.getAndAdd(nba); + + // submit + _queriers.submit(new AssetNamespaceDownloadTask(session(), logger(), nsj, _stimeLast, + settings().batchSize(), settings().overwrite(), settings().unarchive(), _workers, _dl)); + } else if (job instanceof AssetSetDownloadJob) { + + // add total assets + AssetSetDownloadJob asj = (AssetSetDownloadJob) job; + Collection<String> srcAssetPaths = asj.srcAssetPaths(); + if (srcAssetPaths != null) { + _nbAssets.getAndAdd(srcAssetPaths.size()); + } + + _queriers.submit(new AssetSetDownloadTask(session(), logger(), (AssetSetDownloadJob) job, + settings().batchSize(), settings().overwrite(), settings().unarchive(), _workers, _dl)); + } else { + throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); + } + } + + private void submitUploadJob(UploadJob job) throws Throwable { + + if (job instanceof DirectoryUploadJob) { + DirectoryUploadJob duj = (DirectoryUploadJob) job; + + // Find content store of the dst namespace, to be used by + // Input.setStore(). This is required to enable cluster I/O + duj.resolveDstAssetNamespaceStore(session()); + + List<Path> files = new ArrayList<Path>(settings().batchSize()); + Files.walkFileTree(duj.srcDirectory(), + settings().followDirLinks() ? EnumSet.of(FileVisitOption.FOLLOW_LINKS) + : EnumSet.noneOf(FileVisitOption.class), + Integer.MAX_VALUE, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + try { + if (!Files.isRegularFile(file)) { + logger().warning("Skipped '" + file + + "'. Not a regular file. Could be a symbolic link to directory."); + return FileVisitResult.CONTINUE; + } + if (duj.accept(file)) { + files.add(file); + _nbFiles.getAndIncrement(); + if (files.size() >= settings().batchSize()) { + _queriers.submit(new FileSetUploadTask(session(), logger(), + new ArrayList<Path>(files), duj, settings().csumCheck(), + settings().retry(), _ul, _workers)); + files.clear(); + } + } + } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return FileVisitResult.TERMINATE; + } + logger().log(Level.SEVERE, e.getMessage(), e); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException ioe) { + logger().log(Level.SEVERE, "Failed to access file: " + file, ioe); + if (duj.accept(file)) { + String assetPath = duj.transformPath(file); + _ul.transferFailed(file, assetPath); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException ioe) { + if (ioe != null) { + logger().log(Level.SEVERE, ioe.getMessage(), ioe); + } + try { + if (FileUtils.isEmptyDirectory(dir)) { + _workers.submit( + new AssetNamespaceCreateTask(session(), logger(), duj.transformPath(dir))); + } + } catch (IOException e) { + logger().log(Level.SEVERE, ioe.getMessage(), e); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + return super.preVisitDirectory(dir, attrs); + } + }); + if (!files.isEmpty()) { + _queriers.submit(new FileSetUploadTask(session(), logger(), new ArrayList<Path>(files), duj, + settings().csumCheck(), settings().retry(), _ul, _workers)); + files.clear(); + } + } else if (job instanceof FileSetUploadJob) { + FileSetUploadJob uj = (FileSetUploadJob) job; + List<Path> srcFiles = uj.srcFiles() == null ? null : new ArrayList<Path>(uj.srcFiles()); + if (srcFiles != null) { + _nbFiles.getAndAdd(srcFiles.size()); + int batchSize = settings().batchSize(); + int nbBatches = srcFiles.size() / batchSize; + int remainder = srcFiles.size() % batchSize; + if (remainder != 0) { + nbBatches++; + } + + for (int i = 0; i < nbBatches; i++) { + int from = i * batchSize; + int to = i * batchSize + ((remainder == 0 || i < nbBatches - 1) ? batchSize : remainder); + List<Path> files = srcFiles.subList(from, to); + _queriers.submit(new FileSetUploadTask(session(), logger(), new ArrayList<Path>(files), uj, + settings().csumCheck(), settings().retry(), _ul, _workers)); + + } + } + + } else { + throw new UnsupportedOperationException("Unexpected job type: " + job.getClass().getName()); + } + } + + private void syncDeleteAssets() throws Throwable { + List<Job> jobs = settings().jobs(); + for (Job job : jobs) { + if (job instanceof DirectoryUploadJob) { + _queriers.submit(new SyncDeleteAssetsTask(session(), logger(), (DirectoryUploadJob) job, settings(), + _nbDeletedAssets)); + } + } + } + + private void syncDeleteFiles() throws Throwable { + List<Job> jobs = settings().jobs(); + for (Job job : jobs) { + if (job instanceof AssetNamespaceDownloadJob) { + _queriers.submit(new SyncDeleteFilesTask(session(), logger(), (AssetNamespaceDownloadJob) job, + settings(), _workers, _nbDeletedFiles)); + } + } + } + + public void interrupt() { + if (_queriers != null && !_queriers.isShutdown()) { + _queriers.shutdownNow(); + } + if (_workers != null && !_workers.isShutdown()) { + _workers.shutdownNow(); + } + stopDaemon(); + + // NOTE: We must discard the session because connection pooling is + // enabled. + session().discard(); + } + + public void shutdown(boolean wait) { + try { + if (_queriers != null) { + if (!_queriers.isShutdown()) { + // now all jobs have been submitted + _queriers.shutdown(); + } + if (wait) { + // wait until all tasks are submitted by queriers. + _queriers.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + } + } + + if (_workers != null) { + if (!_workers.isShutdown()) { + // now all tasks have been submitted + _workers.shutdown(); + } + if (wait) { + // wait until all tasks are processed by workers. + _workers.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + } + } + + // NOTE: We must discard the session because connection pooling is + // enabled. + session().discard(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger().info("Interrupted '" + Thread.currentThread().getName() + "' thread(id=" + + Thread.currentThread().getId() + ")."); + } catch (Throwable e) { + logger().log(Level.SEVERE, e.getMessage(), e); + } + } + + protected void postExecute() { + if (!settings().hasOnlyCheckJobs()) { + String summary = generateSummary(); + if (settings().logDirectory() != null) { + logger().info(summary); + } + if (settings().logDirectory() == null || !settings().verbose()) { + System.out.print(summary); + } + + if (settings().hasRecipients()) { + notifySummary(summary); + } + } + } + + @Override + public unimelb.mf.client.sync.settings.Settings settings() { + return _settings; + } + + public void notifySummary(String summary) { + try { + String subject = applicationName() + " results [" + new Date() + "]"; + notify(subject, summary); + } catch (Throwable e) { + logger().log(Level.SEVERE, e.getMessage(), e); + } + } + + public void notify(String subject, String message) throws Throwable { + if (settings().hasRecipients()) { + MailUtils.sendMail(session(), settings().recipients(), subject, message, true); + } + } + + public void printVersion() { + System.out.println( + String.format("%s %s (build-time: %s)", this.applicationName(), App.version(), App.buildTime())); + } } diff --git a/src/main/java/unimelb/mf/client/sync/task/FileCsumCalcTask.java b/src/main/java/unimelb/mf/client/sync/task/FileCsumCalcTask.java index 03d522d3cd297abbd942a533fda885baf46e1b8d..d6b80f35d5621167a595eaa0f5738ff3206b45c6 100644 --- a/src/main/java/unimelb/mf/client/sync/task/FileCsumCalcTask.java +++ b/src/main/java/unimelb/mf/client/sync/task/FileCsumCalcTask.java @@ -32,7 +32,7 @@ public class FileCsumCalcTask extends AbstractMFTask { try { csum = ChecksumUtils.get(_file.toFile(), _csumType); } catch (Throwable e) { - e.printStackTrace(); + logError("Cannot generate checksum: Failed to read file: '" + _file + "'", e); } _csumHandler.processChecksum(_file, csum, _csumType); } diff --git a/src/main/java/unimelb/mf/client/sync/task/FileSetCheckTask.java b/src/main/java/unimelb/mf/client/sync/task/FileSetCheckTask.java index b468c45fe5812b75fb10414a32bc5d1b914505db..7927cff184dea303f73ed68338c7219ac28b04f5 100644 --- a/src/main/java/unimelb/mf/client/sync/task/FileSetCheckTask.java +++ b/src/main/java/unimelb/mf/client/sync/task/FileSetCheckTask.java @@ -94,22 +94,15 @@ public class FileSetCheckTask extends AbstractMFTask { AssetItem ai = new AssetItem(ae, _job.dstAssetNamespace()); Path file = _assetFiles.get(assetPath); - // TODO: DEBUG if (file == null) { - System.err.println("DEBUG: file path is null. Unexpected null file path retrieved by asset path: '" - + assetPath + "'"); - _assetFiles.forEach((ap, f) -> { - System.err.println("DEBUG: Asset: " + ap + " | File: " + f); - }); - System.exit(1); + throw new AssertionError("Could not find file matching asset: '" + assetPath + "'"); } Long fileSize = null; try { fileSize = Files.size(file); } catch (Throwable e) { - System.err.println("Error: fail to retrieve length of file: '" + file + "'"); - e.printStackTrace(); + logError("Failed to read file size. File: '" + file + "'", e); } // @formatter:off @@ -124,20 +117,12 @@ public class FileSetCheckTask extends AbstractMFTask { if (!fileSizesMatch) { _rh.checked(new CheckResult(file, ai.assetPath(), true, true, false, _csumCheck ? false : null)); - incCompletedOperations(); - return; - } - - if (fileSize > 0 && contentCsum == null) { + } else if (fileSize > 0 && contentCsum == null) { // not content checksum available. report as mismatch, could be false alarm. But // at least it's an issue. _rh.checked(new CheckResult(file, ai.assetPath(), true, true, fileSizesMatch, _csumCheck ? false : null)); - incCompletedOperations(); - return; - } - - if (_csumCheck) { + } else if (_csumCheck) { if (fileSize == 0 && (contentSize == null || contentSize == 0)) { _rh.checked(new CheckResult(file, ai.assetPath(), true, true, fileSizesMatch, true)); } else { diff --git a/src/main/java/unimelb/mf/client/sync/task/FileSetUploadTask.java b/src/main/java/unimelb/mf/client/sync/task/FileSetUploadTask.java index 4c78bdc91f1442a0819535795da56b66bd280d93..6e9ae59546c40b855cbb041a31e6ed9711fadcdc 100644 --- a/src/main/java/unimelb/mf/client/sync/task/FileSetUploadTask.java +++ b/src/main/java/unimelb/mf/client/sync/task/FileSetUploadTask.java @@ -107,18 +107,20 @@ public class FileSetUploadTask extends AbstractMFTask { Long assetCSum = assetCSumStr == null ? null : Long.parseLong(assetCSumStr, 16); Path file = _assetFiles.get(assetPath); - - // TODO: DEBUG if (file == null) { - System.err.println("DEBUG: file path is null. Unexpected null file path retrieved by asset path: '" - + assetPath + "'"); - _assetFiles.forEach((ap, f) -> { - System.err.println("DEBUG: Asset: " + ap + " | File: " + f); - }); - System.exit(1); + throw new AssertionError("Could not find file matching asset: '" + assetPath + "'"); + } + + Long fileSize = null; + try { + fileSize = Files.size(file); + } catch (Throwable e) { + logError("Failed to read file size. File: '" + file + "'", e); + _ul.transferFailed(file, assetPath); + continue; } - boolean fileSizesMatch = contentSize == null ? false : (contentSize == Files.size(file)); + boolean fileSizesMatch = contentSize == null ? false : (contentSize == fileSize); if (!fileSizesMatch) { // upload file if size does not match diff --git a/src/main/java/unimelb/utils/FileCount.java b/src/main/java/unimelb/utils/FileCount.java new file mode 100644 index 0000000000000000000000000000000000000000..36a6b677d4515ab332dce83a0c30011539718f2d --- /dev/null +++ b/src/main/java/unimelb/utils/FileCount.java @@ -0,0 +1,400 @@ +package unimelb.utils; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.ConsoleHandler; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; + +import unimelb.utils.FileCount.Result; + +public class FileCount implements Callable<Result> { + + public static class Result { + public final Path rootDirectory; + public final long nbFiles; + public final long nbFailedFiles; + public final long nbFileSymlinks; + public final long nbRegularFiles; + public final long nbNonRegularFiles; + public final long totalFileSize; + public final long totalRegularFileSize; + public final long totalNonRegularFileSize; + public final long nbDirectories; + public final long nbFailedDirectories; + + private Map<Long, Long> _thresholdedCounts; + + public Result(Path rootDir, long nbFiles, long nbFailedFiles, long nbFileSymlinks, long nbRegularFiles, + long nbNonRegularFiles, long totalFileSize, long totalRegularFileSize, long totalNonRegularFileSize, + long nbDirectories, long nbFailedDirectories, Map<Long, Long> thresholdedCounts) { + this.rootDirectory = rootDir; + this.nbFiles = nbFiles; + this.nbFailedFiles = nbFailedFiles; + this.nbFileSymlinks = nbFileSymlinks; + this.nbRegularFiles = nbRegularFiles; + this.nbNonRegularFiles = nbNonRegularFiles; + this.totalFileSize = totalFileSize; + this.totalRegularFileSize = totalRegularFileSize; + this.totalNonRegularFileSize = totalNonRegularFileSize; + this.nbDirectories = nbDirectories; + this.nbFailedDirectories = nbFailedDirectories; + if (thresholdedCounts != null && !thresholdedCounts.isEmpty()) { + _thresholdedCounts = new TreeMap<Long, Long>(); + _thresholdedCounts.putAll(thresholdedCounts); + } + } + + public void print(PrintStream o) { + o.println(); + o.println(String.format("%44s: %32s", "Root directory", this.rootDirectory)); + + o.println(); + o.println(String.format("%44s: %,32d", "File count(failed)", this.nbFailedFiles)); + o.println(String.format("%44s: %,32d", "File count(symlinks)", this.nbFileSymlinks)); + o.println(String.format("%44s: %,32d", "File count(regular)", this.nbRegularFiles)); + o.println(String.format("%44s: %,32d", "File count(non-regular)", this.nbNonRegularFiles)); + o.println(String.format("%44s: %,32d", "File count(total)", this.nbFiles)); + + if (_thresholdedCounts != null && !_thresholdedCounts.isEmpty()) { + o.println(); + List<Long> ts = new ArrayList<Long>(_thresholdedCounts.keySet()); + for (int i = 0; i < ts.size(); i++) { + long t = ts.get(i); + long c = _thresholdedCounts.get(t); + if (i == 0) { + if (t == 0) { + o.println(String.format("%44s: %,32d", "Files(size: 0)", c)); + } else { + o.println(String.format("%44s: %,32d", String.format("Files(size: [0, %d])", t), c)); + } + } else { + long pt = ts.get(i - 1); + long pc = _thresholdedCounts.get(pt); + o.println( + String.format("%44s: %,32d", String.format("Files(size: [%d,%d])", pt + 1, t), c - pc)); + } + if (i == ts.size() - 1) { + o.println(String.format("%44s: %,32d", String.format("Files(size: [%d,∞])", t), + this.nbFiles - c)); + } + } + } + + o.println(); + o.println(String.format("%44s: %,32d", "File size(regular)", this.totalRegularFileSize)); + o.println(String.format("%44s: %,32d", "File size(non-regular)", this.totalNonRegularFileSize)); + o.println(String.format("%44s: %,32d", "File size(total)", this.totalFileSize)); + + o.println(); + o.println(String.format("%44s: %,32d", "Directory count(failed)", this.nbFailedDirectories)); + o.println(String.format("%44s: %,32d", "Directory count(total)", this.nbDirectories)); + o.println(); + + } + } + + private static final Logger logger = Logger.getLogger(FileCount.class.getName()); + static { + ConsoleHandler handler = new ConsoleHandler(); + handler.setFormatter(new SimpleFormatter()); + handler.setLevel(Level.ALL); + logger.addHandler(handler); + logger.setLevel(Level.ALL); + } + + private Path _rootDir; + private boolean _followLinks; + private Map<Long, AtomicLong> _thresholds; + + private AtomicLong _nbFiles; + private AtomicLong _nbFailedFiles; + private AtomicLong _nbFileSymlinks; + private AtomicLong _nbRegularFiles; + private AtomicLong _nbNonRegularFiles; + private AtomicLong _totalFileSize; + private AtomicLong _totalRegularFileSize; + private AtomicLong _totalNonRegularFileSize; + private AtomicLong _nbDirs; + private AtomicLong _nbFailedDirs; + + public FileCount(Path rootDir, boolean followLinks, long... thresholds) throws Throwable { + _rootDir = rootDir; + _followLinks = followLinks; + if (thresholds != null && thresholds.length > 0) { + _thresholds = new TreeMap<Long, AtomicLong>(); + for (long threshold : thresholds) { + _thresholds.put(threshold, new AtomicLong(0L)); + } + } + + _nbFiles = new AtomicLong(0L); + _nbFailedFiles = new AtomicLong(0L); + _nbFileSymlinks = new AtomicLong(0L); + _nbRegularFiles = new AtomicLong(0L); + _nbNonRegularFiles = new AtomicLong(0L); + _totalFileSize = new AtomicLong(0L); + _totalRegularFileSize = new AtomicLong(0L); + _totalNonRegularFileSize = new AtomicLong(0L); + _nbDirs = new AtomicLong(0L); + _nbFailedDirs = new AtomicLong(0L); + } + + public Path rootDir() { + return _rootDir; + } + + public boolean followLinks() { + return _followLinks; + } + + public long[] thresholds() { + if (_thresholds != null && !_thresholds.isEmpty()) { + Set<Long> ts = _thresholds.keySet(); + long[] thresholds = new long[ts.size()]; + int i = 0; + for (Long t : ts) { + thresholds[i] = t; + ++i; + } + return thresholds; + } + return null; + } + + public long nbFiles() { + return _nbFiles.get(); + } + + public long nbFailedFiles() { + return _nbFailedFiles.get(); + } + + public long nbFileSymlinks() { + return _nbFileSymlinks.get(); + } + + public long nbRegularFiles() { + return _nbRegularFiles.get(); + } + + public long nbNonRegularFiles() { + return _nbNonRegularFiles.get(); + } + + public long totalFileSize() { + return _totalFileSize.get(); + } + + public long totalRegularFileSize() { + return _totalRegularFileSize.get(); + } + + public long totalNonRegularFileSize() { + return _totalNonRegularFileSize.get(); + } + + public long nbDirectories() { + return _nbDirs.get(); + } + + public long nbFailedDirectories() { + return _nbFailedDirs.get(); + } + + private void updateThresholdSize(long fileSize) { + if (_thresholds == null || _thresholds.isEmpty()) { + return; + } + _thresholds.forEach((t, s) -> { + if (fileSize <= t) { + s.getAndIncrement(); + } + }); + } + + @Override + public Result call() throws Exception { + Files.walkFileTree(_rootDir, + _followLinks ? EnumSet.of(FileVisitOption.FOLLOW_LINKS) : EnumSet.noneOf(FileVisitOption.class), + Integer.MAX_VALUE, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + _nbFiles.getAndIncrement(); + System.out + .println(String.format("%d: '%s'", _nbFiles.get(), file.normalize().toAbsolutePath())); + if (Files.isSymbolicLink(file)) { + _nbFileSymlinks.getAndIncrement(); + } else { + long fileSize = Files.size(file); + _totalFileSize.getAndAdd(fileSize); + updateThresholdSize(fileSize); + if (Files.isRegularFile(file)) { + _nbRegularFiles.getAndIncrement(); + _totalRegularFileSize.getAndAdd(fileSize); + } else { + _nbNonRegularFiles.getAndIncrement(); + _totalNonRegularFileSize.getAndAdd(fileSize); + } + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException ioe) { + if (Files.isDirectory(file)) { + _nbFailedDirs.getAndIncrement(); + } else { + _nbFailedFiles.getAndIncrement(); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException ioe) { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + _nbDirs.getAndIncrement(); + return super.preVisitDirectory(dir, attrs); + } + }); + Map<Long, Long> thresholdedCounts = new TreeMap<Long, Long>(); + if (_thresholds != null && !_thresholds.isEmpty()) { + _thresholds.forEach((t, c) -> { + thresholdedCounts.put(t, c.get()); + }); + } + return new Result(_rootDir, nbFiles(), nbFailedFiles(), nbFileSymlinks(), nbRegularFiles(), nbNonRegularFiles(), + totalFileSize(), totalRegularFileSize(), totalNonRegularFileSize(), nbDirectories(), + nbFailedDirectories(), thresholdedCounts); + } + + private static long[] parseThresholds(String ts) { + if (ts != null) { + ts = ts.replace(" ", "").replaceAll(",$", "").replaceAll("^,", ""); + if (!ts.isEmpty()) { + String[] ss = ts.split(","); + long[] ls = new long[ss.length]; + for (int i = 0; i < ss.length; i++) { + ls[i] = parseSize(ss[i]); + } + return ls; + } + } + return null; + } + + private static long parseSize(String s) { + long size; + s = s.toLowerCase(); + if (s.endsWith("b")) { + size = Long.parseLong(s.substring(0, s.length() - 1)); + } else if (s.endsWith("k")) { + size = Long.parseLong(s.substring(0, s.length() - 1)) * 1000L; + } else if (s.endsWith("m")) { + size = Long.parseLong(s.substring(0, s.length() - 1)) * 1000000L; + } else if (s.endsWith("g")) { + size = Long.parseLong(s.substring(0, s.length() - 1)) * 1000000000L; + } else if (s.endsWith("t")) { + size = Long.parseLong(s.substring(0, s.length() - 1)) * 1000000000000L; + } else { + size = Long.parseLong(s); + } + if (size < 0) { + throw new IllegalArgumentException("Invalid file size threshold: " + size); + } + return size; + } + + public static Result execute(Path rootDir, boolean followLinks, long... thresholds) throws Throwable { + return new FileCount(rootDir, followLinks, thresholds).call(); + } + + public static int executeCommand(String[] args) throws Throwable { + Boolean followLinks = null; + long[] thresholds = null; + Path rootDir = null; + try { + for (int i = 0; i < args.length;) { + if (args[i].equalsIgnoreCase("--follow-links")) { + if (followLinks != null) { + throw new IllegalArgumentException("Multiple --follow-links found!"); + } + followLinks = true; + i++; + } else if (args[i].equalsIgnoreCase("--thresholds")) { + if (thresholds != null) { + throw new IllegalArgumentException("Multiple --thresholds found!"); + } + thresholds = parseThresholds(args[i + 1]); + i += 2; + } else { + if (args[i].startsWith("-")) { + throw new IllegalArgumentException("Unknown option: " + args[i]); + } + if (rootDir != null) { + throw new IllegalArgumentException("Multiple root directory specified. Expects only one."); + } + rootDir = Paths.get(args[i]).normalize().toAbsolutePath(); + i++; + } + } + if (rootDir == null) { + rootDir = Paths.get(System.getProperty("user.dir")); + } + if (followLinks == null) { + followLinks = false; + } + } catch (Throwable e) { + throw new IllegalArgumentException(e); + } + execute(rootDir, followLinks, thresholds).print(System.out); + return 0; + } + + public static void printCommandUsage() { + // @formatter:off + System.out.println(); + System.out.println("Usage: file-count [--follow-links] [--thresholds size[b|k|m|g|t]] [directory]"); + System.out.println(); + System.out.println("Description: count the total number of files and sum up the total file sizes in the specified directory. If the directory is not specified, defaults to current work directory."); + System.out.println(); + System.out.println("Options:"); + System.out.println(" --follow-links follow symbolic links."); + System.out.println(" --thresholds size[k|b|m|g] count files by the specified thresholds. separated by comma. For example 1k,1m,10m,1g,5g"); + System.out.println(); + // @formatter:on + } + + public static void main(String[] args) { + try { + executeCommand(args); + } catch (Throwable e) { + e.printStackTrace(); + if (e instanceof IllegalArgumentException) { + printCommandUsage(); + } + System.exit(1); + } + } + +} diff --git a/src/main/scripts/unix/file-count b/src/main/scripts/unix/file-count new file mode 100644 index 0000000000000000000000000000000000000000..673abf8eed34981d10581af571aed87090c97beb --- /dev/null +++ b/src/main/scripts/unix/file-count @@ -0,0 +1,28 @@ +#!/bin/bash + +# ${ROOT}/bin/ +BIN=$(dirname ${BASH_SOURCE[0]}) + +# current directory +CWD=$(pwd) + +# ${ROOT}/ +ROOT=$(cd ${BIN}/../../ && pwd && cd ${CWD}) + +# ${ROOT}/lib/ +LIB=${ROOT}/lib + +# ${ROOT}/lib/unimelb-mf-clients.jar +JAR=${LIB}/unimelb-mf-clients.jar + +# check if unimelb-mf-clients.jar exists +[[ ! -f $JAR ]] && echo "${JAR} is not found." >&2 && exit 2 + +#export JAVA_HOME=${ROOT}/@JAVA_HOME@ +#export PATH=${JAVA_HOME}/bin:${PATH} + +# check if java exists +[[ -z $(which java) ]] && echo "Java is not found." >&2 && exit 1 + +# execute the command +java -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+UseStringDeduplication -Xmx1g -cp "${JAR}" unimelb.utils.FileCount ${1+"$@"} diff --git a/src/main/scripts/windows/file-count.cmd b/src/main/scripts/windows/file-count.cmd new file mode 100644 index 0000000000000000000000000000000000000000..5373dca16c1f6d0c0d01a3426501f12ab7a7bbc2 --- /dev/null +++ b/src/main/scripts/windows/file-count.cmd @@ -0,0 +1,12 @@ +@echo off + +pushd %~dp0..\..\ +set ROOT=%cd% +popd + +@REM set JAVA_HOME=%ROOT%\@JAVA_HOME@ +@REM set PATH=%JAVA_HOME%\bin;%PATH% + +set JAR=%ROOT%\lib\unimelb-mf-clients.jar + +java -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+UseStringDeduplication -Xmx1g -cp "%JAR%" unimelb.utils.FileCount %*