From 31a2802cd112269f5422f1ee7e600e2f5aa3b647 Mon Sep 17 00:00:00 2001 From: Louis Royer Date: Fri, 3 Apr 2020 22:04:02 +0200 Subject: [PATCH] Fix #105 (#114) Fix #105 --- src/tracker/TrackerManagement.java | 197 +++++++++++++++++++++++------ 1 file changed, 156 insertions(+), 41 deletions(-) diff --git a/src/tracker/TrackerManagement.java b/src/tracker/TrackerManagement.java index 4d1a37a..c91a855 100644 --- a/src/tracker/TrackerManagement.java +++ b/src/tracker/TrackerManagement.java @@ -21,7 +21,8 @@ import protocolP2P.UpdateRatio; import localException.InternalError; import remoteException.EmptyDirectory; import exception.LocalException; - +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; /** Tracker management implementation * @author Louis Royer @@ -37,6 +38,8 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable protected Map ratioDown = new HashMap<>(); protected Map> fileList = new HashMap<>(); protected volatile boolean stop; + protected AtomicBoolean writeLock; + protected AtomicInteger readLock; /** Constructor * @param tracker Tracker HostItem @@ -46,6 +49,8 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable stop = false; this.tracker = tracker; this.logger = logger; + writeLock = new AtomicBoolean(); + readLock = new AtomicInteger(); } /** Handle Discover request @@ -60,7 +65,17 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable } else { String filename = ((DiscoverRequest)p).getFilename(); try { - pd.sendResponse(createProtocolP2PPacket(new DiscoverResponse(filename, fileList.getOrDefault(filename, hostList)))); + synchronized (this) { + while(writeLock.get()) { + this.wait(); + } + readLock.incrementAndGet(); + pd.sendResponse(createProtocolP2PPacket(new DiscoverResponse(filename, fileList.getOrDefault(filename, hostList)))); + readLock.decrementAndGet(); + this.notify(); + } + } catch (InterruptedException e) { + throw new InternalError(); } catch (Exception e) { writeLog(e, LogLevel.Error); } @@ -81,18 +96,29 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable HostItem host = ((RatioRequest)p).getHostItem(); writeLog("Ratio request for host " + host, LogLevel.Debug); try { - if (!hostList.contains(host)) { - String l = ""; - for (HostItem h: hostList) { - l += h + " "; + synchronized (this) { + while(writeLock.get()) { + this.wait(); } - writeLog(host + " is not in hostlist [ " + l + "]", LogLevel.Debug); - pd.sendResponse(createProtocolP2PPacket(new Payload(RequestResponseCode.UNKNOWN_HOST))); - } else { - pd.sendResponse(createProtocolP2PPacket(new RatioResponse(host, - ratioUp.get(host).longValue(), - ratioDown.get(host).longValue()))); + readLock.incrementAndGet(); + if (!hostList.contains(host)) { + String l = ""; + for (HostItem h: hostList) { + l += h + " "; + } + writeLog(host + " is not in hostlist [ " + l + "]", LogLevel.Debug); + pd.sendResponse(createProtocolP2PPacket(new Payload(RequestResponseCode.UNKNOWN_HOST))); + } else { + + pd.sendResponse(createProtocolP2PPacket(new RatioResponse(host, + ratioUp.get(host).longValue(), + ratioDown.get(host).longValue()))); + } + readLock.decrementAndGet(); + this.notify(); } + } catch (InterruptedException e) { + throw new InternalError(); } catch (Exception e) { writeLog(e, LogLevel.Error); } @@ -110,17 +136,31 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable throw new InternalError(); } else { String[] f = ((FileList)p).getFileList(); - for (String file: f) { - List h = fileList.get(file); - if (h != null) { - if (!h.contains(host)) { - h.add(host); + try { + synchronized(this) { + while(writeLock.getAndSet(true)) { + this.wait(); } - } else { - List emptyH = new ArrayList<>(); - emptyH.add(host); - fileList.put(file, emptyH); - } + while(readLock.get() > 0) { + this.wait(); + } + for (String file: f) { + List h = fileList.get(file); + if (h != null) { + if (!h.contains(host)) { + h.add(host); + } + } else { + List emptyH = new ArrayList<>(); + emptyH.add(host); + fileList.put(file, emptyH); + } + } + writeLock.getAndSet(false); + this.notifyAll(); + } + } catch(InterruptedException e) { + throw new InternalError(); } } } @@ -138,14 +178,28 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable } HostItem host = ((Unregister)p).getHostItem(); writeLog("Received UNREGISTER from host " + pd.getHostItem() + ". Removing host " + host, LogLevel.Action); - hostList.remove(host); - for(String f: fileList.keySet()) { - fileList.get(f).remove(host); - if(fileList.get(f).isEmpty()) { - fileList.remove(f); + try { + synchronized (this) { + while(writeLock.getAndSet(true)) { + this.wait(); + } + while(readLock.get() > 0) { + this.wait(); + } + hostList.remove(host); + Map> fileListCpy = new HashMap<>(fileList); + for(String f: fileListCpy.keySet()) { + fileList.get(f).remove(host); + if(fileList.get(f).isEmpty()) { + fileList.remove(f); + } + } + writeLock.getAndSet(false); + this.notifyAll(); } + } catch (InterruptedException e) { + throw new InternalError(); } - /* Note: we do not remove host from ratioUp and ratioDown since Unregistering is only here to indicate * the host is temporary not serving any files (ie. is down). */ @@ -177,11 +231,25 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable HostItem updateRatioClient = updateRatio.getClient(); long ratioSize = updateRatio.getDataSize(); writeLog("Ratio += " + ratioSize + ", client: " + updateRatioClient + " / server: " + updateRatioServer, LogLevel.Debug); - if (!ratioDown.containsKey(updateRatioClient) || ! ratioUp.containsKey(updateRatioServer)) { - sendUnknownHost(pd); - } else { - ratioDown.put(updateRatioClient, Long.valueOf(ratioDown.get(updateRatioClient).longValue() + ratioSize)); - ratioUp.put(updateRatioServer, Long.valueOf(ratioUp.get(updateRatioServer).longValue() + ratioSize)); + try { + synchronized(this) { + while(writeLock.getAndSet(true)) { + this.wait(); + } + while(readLock.get() > 0) { + this.wait(); + } + if (!ratioDown.containsKey(updateRatioClient) || ! ratioUp.containsKey(updateRatioServer)) { + sendUnknownHost(pd); + } else { + ratioDown.put(updateRatioClient, Long.valueOf(ratioDown.get(updateRatioClient).longValue() + ratioSize)); + ratioUp.put(updateRatioServer, Long.valueOf(ratioUp.get(updateRatioServer).longValue() + ratioSize)); + } + writeLock.getAndSet(false); + this.notifyAll(); + } + } catch (InterruptedException e) { + throw new InternalError(); } } @@ -197,13 +265,26 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable } // add host to known host list HostItem host = ((Register)p).getHostItem(); - if (!hostList.contains(host)) { - hostList.add(host); + try { + synchronized (this) { + while(writeLock.getAndSet(true)) { + this.wait(); + } + while(readLock.get() > 0) { + this.wait(); + } + if (!hostList.contains(host)) { + hostList.add(host); + } + // initialize ratios if this is a new host + ratioUp.putIfAbsent(host, Long.valueOf(0)); + ratioDown.putIfAbsent(host, Long.valueOf(0)); + writeLock.getAndSet(false); + this.notifyAll(); + } + } catch(InterruptedException e) { + throw new InternalError(); } - - // initialize ratios if this is a new host - ratioUp.putIfAbsent(host, Long.valueOf(0)); - ratioDown.putIfAbsent(host, Long.valueOf(0)); // send a list request try { @@ -215,11 +296,45 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable closeHostItemSocket(host); } catch (EmptyDirectory e) { writeLog("Empty Directory", LogLevel.Debug); - hostList.remove(host); + try { + synchronized (this) { + while(writeLock.getAndSet(true)) { + this.wait(); + } + while(readLock.get() > 0) { + this.wait(); + } + if (!hostList.contains(host)) { + hostList.add(host); + } + hostList.remove(host); + writeLock.getAndSet(false); + this.notifyAll(); + } + } catch(InterruptedException e2) { + throw new InternalError(); + } writeLog("Received EMPTY DIRECTORY from host " + pd.getHostItem() + ". Aborting.", LogLevel.Action); } catch (Exception e) { // remove from list because list request could not be send - hostList.remove(host); + try { + synchronized (this) { + while(writeLock.getAndSet(true)) { + this.wait(); + } + while(readLock.get() > 0) { + this.wait(); + } + if (!hostList.contains(host)) { + hostList.add(host); + } + hostList.remove(host); + writeLock.getAndSet(false); + this.notifyAll(); + } + } catch(InterruptedException e2) { + throw new InternalError(); + } writeLog("Aborting the add of host " + host, LogLevel.Action); writeLog(e, LogLevel.Error); }