Fix #105 (#114)
flavien's git/Projet_JAVA_P2P_STRI2A/pipeline/head This commit looks good Details

Fix #105
pull/116/head
Louis Royer 4 years ago
parent aae25fbd99
commit 31a2802cd1

@ -21,7 +21,8 @@ import protocolP2P.UpdateRatio;
import localException.InternalError;
import remoteException.EmptyDirectory;
import exception.LocalException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
/** Tracker management implementation
* @author Louis Royer
@ -37,6 +38,8 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
protected Map<HostItem, Long> ratioDown = new HashMap<>();
protected Map<String, List<HostItem>> fileList = new HashMap<>();
protected volatile boolean stop;
protected AtomicBoolean writeLock;
protected AtomicInteger readLock;
/** Constructor
* @param tracker Tracker HostItem
@ -46,6 +49,8 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
stop = false;
this.tracker = tracker;
this.logger = logger;
writeLock = new AtomicBoolean();
readLock = new AtomicInteger();
}
/** Handle Discover request
@ -60,7 +65,17 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
} else {
String filename = ((DiscoverRequest)p).getFilename();
try {
pd.sendResponse(createProtocolP2PPacket(new DiscoverResponse(filename, fileList.getOrDefault(filename, hostList))));
synchronized (this) {
while(writeLock.get()) {
this.wait();
}
readLock.incrementAndGet();
pd.sendResponse(createProtocolP2PPacket(new DiscoverResponse(filename, fileList.getOrDefault(filename, hostList))));
readLock.decrementAndGet();
this.notify();
}
} catch (InterruptedException e) {
throw new InternalError();
} catch (Exception e) {
writeLog(e, LogLevel.Error);
}
@ -81,18 +96,29 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
HostItem host = ((RatioRequest)p).getHostItem();
writeLog("Ratio request for host " + host, LogLevel.Debug);
try {
if (!hostList.contains(host)) {
String l = "";
for (HostItem h: hostList) {
l += h + " ";
synchronized (this) {
while(writeLock.get()) {
this.wait();
}
writeLog(host + " is not in hostlist [ " + l + "]", LogLevel.Debug);
pd.sendResponse(createProtocolP2PPacket(new Payload(RequestResponseCode.UNKNOWN_HOST)));
} else {
pd.sendResponse(createProtocolP2PPacket(new RatioResponse(host,
ratioUp.get(host).longValue(),
ratioDown.get(host).longValue())));
readLock.incrementAndGet();
if (!hostList.contains(host)) {
String l = "";
for (HostItem h: hostList) {
l += h + " ";
}
writeLog(host + " is not in hostlist [ " + l + "]", LogLevel.Debug);
pd.sendResponse(createProtocolP2PPacket(new Payload(RequestResponseCode.UNKNOWN_HOST)));
} else {
pd.sendResponse(createProtocolP2PPacket(new RatioResponse(host,
ratioUp.get(host).longValue(),
ratioDown.get(host).longValue())));
}
readLock.decrementAndGet();
this.notify();
}
} catch (InterruptedException e) {
throw new InternalError();
} catch (Exception e) {
writeLog(e, LogLevel.Error);
}
@ -110,17 +136,31 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
throw new InternalError();
} else {
String[] f = ((FileList)p).getFileList();
for (String file: f) {
List<HostItem> h = fileList.get(file);
if (h != null) {
if (!h.contains(host)) {
h.add(host);
try {
synchronized(this) {
while(writeLock.getAndSet(true)) {
this.wait();
}
} else {
List<HostItem> emptyH = new ArrayList<>();
emptyH.add(host);
fileList.put(file, emptyH);
}
while(readLock.get() > 0) {
this.wait();
}
for (String file: f) {
List<HostItem> h = fileList.get(file);
if (h != null) {
if (!h.contains(host)) {
h.add(host);
}
} else {
List<HostItem> emptyH = new ArrayList<>();
emptyH.add(host);
fileList.put(file, emptyH);
}
}
writeLock.getAndSet(false);
this.notifyAll();
}
} catch(InterruptedException e) {
throw new InternalError();
}
}
}
@ -138,14 +178,28 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
}
HostItem host = ((Unregister)p).getHostItem();
writeLog("Received UNREGISTER from host " + pd.getHostItem() + ". Removing host " + host, LogLevel.Action);
hostList.remove(host);
for(String f: fileList.keySet()) {
fileList.get(f).remove(host);
if(fileList.get(f).isEmpty()) {
fileList.remove(f);
try {
synchronized (this) {
while(writeLock.getAndSet(true)) {
this.wait();
}
while(readLock.get() > 0) {
this.wait();
}
hostList.remove(host);
Map<String, List<HostItem>> fileListCpy = new HashMap<>(fileList);
for(String f: fileListCpy.keySet()) {
fileList.get(f).remove(host);
if(fileList.get(f).isEmpty()) {
fileList.remove(f);
}
}
writeLock.getAndSet(false);
this.notifyAll();
}
} catch (InterruptedException e) {
throw new InternalError();
}
/* Note: we do not remove host from ratioUp and ratioDown since Unregistering is only here to indicate
* the host is temporary not serving any files (ie. is down).
*/
@ -177,11 +231,25 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
HostItem updateRatioClient = updateRatio.getClient();
long ratioSize = updateRatio.getDataSize();
writeLog("Ratio += " + ratioSize + ", client: " + updateRatioClient + " / server: " + updateRatioServer, LogLevel.Debug);
if (!ratioDown.containsKey(updateRatioClient) || ! ratioUp.containsKey(updateRatioServer)) {
sendUnknownHost(pd);
} else {
ratioDown.put(updateRatioClient, Long.valueOf(ratioDown.get(updateRatioClient).longValue() + ratioSize));
ratioUp.put(updateRatioServer, Long.valueOf(ratioUp.get(updateRatioServer).longValue() + ratioSize));
try {
synchronized(this) {
while(writeLock.getAndSet(true)) {
this.wait();
}
while(readLock.get() > 0) {
this.wait();
}
if (!ratioDown.containsKey(updateRatioClient) || ! ratioUp.containsKey(updateRatioServer)) {
sendUnknownHost(pd);
} else {
ratioDown.put(updateRatioClient, Long.valueOf(ratioDown.get(updateRatioClient).longValue() + ratioSize));
ratioUp.put(updateRatioServer, Long.valueOf(ratioUp.get(updateRatioServer).longValue() + ratioSize));
}
writeLock.getAndSet(false);
this.notifyAll();
}
} catch (InterruptedException e) {
throw new InternalError();
}
}
@ -197,13 +265,26 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
}
// add host to known host list
HostItem host = ((Register)p).getHostItem();
if (!hostList.contains(host)) {
hostList.add(host);
try {
synchronized (this) {
while(writeLock.getAndSet(true)) {
this.wait();
}
while(readLock.get() > 0) {
this.wait();
}
if (!hostList.contains(host)) {
hostList.add(host);
}
// initialize ratios if this is a new host
ratioUp.putIfAbsent(host, Long.valueOf(0));
ratioDown.putIfAbsent(host, Long.valueOf(0));
writeLock.getAndSet(false);
this.notifyAll();
}
} catch(InterruptedException e) {
throw new InternalError();
}
// initialize ratios if this is a new host
ratioUp.putIfAbsent(host, Long.valueOf(0));
ratioDown.putIfAbsent(host, Long.valueOf(0));
// send a list request
try {
@ -215,11 +296,45 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
closeHostItemSocket(host);
} catch (EmptyDirectory e) {
writeLog("Empty Directory", LogLevel.Debug);
hostList.remove(host);
try {
synchronized (this) {
while(writeLock.getAndSet(true)) {
this.wait();
}
while(readLock.get() > 0) {
this.wait();
}
if (!hostList.contains(host)) {
hostList.add(host);
}
hostList.remove(host);
writeLock.getAndSet(false);
this.notifyAll();
}
} catch(InterruptedException e2) {
throw new InternalError();
}
writeLog("Received EMPTY DIRECTORY from host " + pd.getHostItem() + ". Aborting.", LogLevel.Action);
} catch (Exception e) {
// remove from list because list request could not be send
hostList.remove(host);
try {
synchronized (this) {
while(writeLock.getAndSet(true)) {
this.wait();
}
while(readLock.get() > 0) {
this.wait();
}
if (!hostList.contains(host)) {
hostList.add(host);
}
hostList.remove(host);
writeLock.getAndSet(false);
this.notifyAll();
}
} catch(InterruptedException e2) {
throw new InternalError();
}
writeLog("Aborting the add of host " + host, LogLevel.Action);
writeLog(e, LogLevel.Error);
}

Loading…
Cancel
Save