Étape 5 #84

Merged
louis_royer merged 19 commits from etape5 into master 2020-03-30 15:23:11 +02:00
16 changed files with 413 additions and 38 deletions
Showing only changes of commit e02386453b - Show all commits

View File

@ -126,8 +126,7 @@ public abstract class ClientDownload extends ServeErrors implements Runnable {
try {
sockList.get(rand.nextInt(sockList.size())).assignTask(offset);
offsetsPending.add(offset);
System.err.println("Assigned task "+ offset);
writeLog("Assigned task "+ offset, LogLevel.Info);
writeLog("Assigned task: #"+ offset, LogLevel.Info);
} catch(InterruptedException e) {
writeLog(e, LogLevel.Error);
throw new InternalError();
@ -232,7 +231,6 @@ public abstract class ClientDownload extends ServeErrors implements Runnable {
writeLog(e, LogLevel.Error);
hash = new byte[0];
} catch (NotFound e) {
writeLog(e, LogLevel.Error);
hash = new byte[0];
} catch (LocalException e) {
writeLog(e, LogLevel.Error);
@ -258,6 +256,7 @@ public abstract class ClientDownload extends ServeErrors implements Runnable {
*/
protected void purgeList() throws InternalError {
List<HostItem> blackList = new ArrayList<HostItem>();
writeLog("Potential peers (before purge): " + hostList.size(), LogLevel.Debug);
boolean first = false;
byte[] hashsum;
for(HostItem host: hostList) {
@ -276,6 +275,7 @@ public abstract class ClientDownload extends ServeErrors implements Runnable {
for(HostItem host: blackList) {
hostList.remove(host);
}
writeLog("Peers (after purge): " + hostList.size(), LogLevel.Debug);
writeLog("Host list purge: done", LogLevel.Info);
}
@ -368,17 +368,11 @@ public abstract class ClientDownload extends ServeErrors implements Runnable {
public void run() {
try {
init();
if (stop) {
writeLog("File is smaller than part max size.", LogLevel.Info);
closeHostItemSocket(hostList.get(0));
} else {
writeLog("File is bigger than part max size.", LogLevel.Info);
purgeList();
initThreads();
while(!stop) {
assignTasks();
checkTasksStatus();
}
purgeList();
initThreads();
while(!stop) {
assignTasks();
checkTasksStatus();
}
writeLog("Reassembling file parts.", LogLevel.Info);
reassembleFile();

View File

@ -8,6 +8,7 @@ import protocolP2P.ProtocolP2PPacket;
import protocolP2P.Payload;
import protocolP2P.LoadRequest;
import protocolP2P.FilePart;
import protocolP2P.Denied;
import localException.InternalError;
import localException.ProtocolError;
import localException.TransmissionError;
@ -203,7 +204,6 @@ public abstract class ClientDownloadPart extends ServeErrors implements Runnable
if (p == null) {
stop = true;
}
failed = downloadPart(p);
if (failed) {
System.err.println("Error: DownloadPart failed.");
@ -224,7 +224,7 @@ public abstract class ClientDownloadPart extends ServeErrors implements Runnable
* @return ProtocolP2PPacketTCP used to send request
*/
protected ProtocolP2PPacket<?> reqPart(Long offset) {
writeLog("New request: " + offset, LogLevel.Info);
writeLog("New request: #" + offset, LogLevel.Info);
// maintain tracking of tasks
if (toDoTasks.contains(offset)) {
try {
@ -279,6 +279,22 @@ public abstract class ClientDownloadPart extends ServeErrors implements Runnable
}
try {
Payload p = d.receiveResponse().getPayload();
if (p instanceof Denied) {
Denied denied = (Denied)p;
if (!denied.getFilename().equals(filename)) {
writeLog("wrong file deny response received: `" + denied.getFilename() + "`", LogLevel.Error);
return true;
}
Long offset = Long.valueOf(denied.getOffset());
if (pendingTasks.contains(offset)) {
pendingTasks.remove(offset);
toDoTasks.add(offset);
return false;
} else {
writeLog("wrong file offset deny received: " + offset, LogLevel.Error);
return true;
}
}
assert p instanceof FilePart : "This payload must be instance of FilePart";
if (!(p instanceof FilePart)) {
writeLog("cannot get size.", LogLevel.Error);

View File

@ -26,6 +26,7 @@ import remoteException.NotFound;
import remoteException.ProtocolRemoteError;
import remoteException.VersionRemoteError;
import remoteException.NotATracker;
import remoteException.UnknownHost;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@ -165,6 +166,9 @@ public abstract class ClientManagement extends ServeErrors implements Runnable {
} catch (NotATracker e) {
writeLog(e, LogLevel.Error);
throw new ProtocolError();
} catch (UnknownHost e) {
writeLog(e, LogLevel.Error);
throw new ProtocolError();
}
}
@ -212,6 +216,7 @@ public abstract class ClientManagement extends ServeErrors implements Runnable {
throw new InternalError();
} else {
downLoader.sendRatioUpdate();
writeLog("Ratio updates sent.", LogLevel.Info);
}
} else {
throw new InternalError();

View File

@ -56,7 +56,7 @@ public class DiscoverResponse extends Payload {
int port = BytesArrayTools.readInt16Bits(packet, i);
i += 2;
String hostname = BytesArrayTools.readString(packet, i, "\n");
i += hostname.length();
i += hostname.length() + 1; // 1 for the "\n"
hostList.add(new HostItem(hostname, port));
}
}

View File

@ -72,7 +72,7 @@ public class LoadRequest extends Payload {
/* Read hostItem */
int portPosition = FILENAME_POSITION + size;
int hostnameStartPosition = portPosition + 2;
int hostnameSize = getPayloadSize(packet) - hostnameStartPosition;
int hostnameSize = getPayloadSize(packet) - hostnameStartPosition + PAYLOAD_START_POSITION;
hostItem = new HostItem(BytesArrayTools.readString(packet, hostnameStartPosition, hostnameSize), BytesArrayTools.readInt16Bits(packet, portPosition));
}

View File

@ -12,6 +12,7 @@ import remoteException.ProtocolRemoteError;
import remoteException.VersionRemoteError;
import remoteException.EmptyFile;
import remoteException.NotATracker;
import remoteException.UnknownHost;
import java.io.IOException;
import tools.HostItem;
@ -72,8 +73,9 @@ public abstract class ProtocolP2PPacket < T extends Payload>{
* @throws SizeError
* @throws IOException
* @throws SocketClosed
* @throws UnknownHost
*/
public abstract ProtocolP2PPacket<?> receiveResponse() throws EmptyFile, NotFound, NotATracker, EmptyDirectory, InternalRemoteError, VersionRemoteError, ProtocolRemoteError, TransmissionError, ProtocolError, VersionError, InternalError, SizeError, IOException, SocketClosed;
public abstract ProtocolP2PPacket<?> receiveResponse() throws EmptyFile, NotFound, NotATracker, EmptyDirectory, InternalRemoteError, VersionRemoteError, ProtocolRemoteError, TransmissionError, ProtocolError, VersionError, InternalError, SizeError, IOException, SocketClosed, UnknownHost;
/** Receive a request, subclasses must overwrite this constructor.
* @param socket socket used to get the request

View File

@ -12,6 +12,7 @@ import remoteException.NotFound;
import remoteException.ProtocolRemoteError;
import remoteException.VersionRemoteError;
import remoteException.EmptyFile;
import remoteException.UnknownHost;
import tools.HostItem;
import protocolP2P.Payload;
import protocolP2P.RequestResponseCode;
@ -212,8 +213,9 @@ public class ProtocolP2PPacketTCP < T extends Payload > extends ProtocolP2PPacke
* @throws SizeError
* @throws IOException
* @throws SocketClosed
* @throws UnknownHost
*/
public ProtocolP2PPacket<?> receiveResponse() throws EmptyFile, NotFound, NotATracker, EmptyDirectory, InternalRemoteError, VersionRemoteError, ProtocolRemoteError, TransmissionError, ProtocolError, VersionError, InternalError, SizeError, IOException, SocketClosed {
public ProtocolP2PPacket<?> receiveResponse() throws EmptyFile, NotFound, NotATracker, EmptyDirectory, InternalRemoteError, VersionRemoteError, ProtocolRemoteError, TransmissionError, ProtocolError, VersionError, InternalError, SizeError, IOException, SocketClosed, UnknownHost {
assert requestSocket != null : "Cannot receive response because request packet not sent.";
if (requestSocket == null) {
throw new InternalError();
@ -253,6 +255,8 @@ public class ProtocolP2PPacketTCP < T extends Payload > extends ProtocolP2PPacke
throw new EmptyFile();
case NOT_A_TRACKER:
throw new NotATracker();
case UNKNOWN_HOST:
throw new UnknownHost();
default :
return (ProtocolP2PPacket)p;
}

View File

@ -12,6 +12,7 @@ import remoteException.NotFound;
import remoteException.ProtocolRemoteError;
import remoteException.VersionRemoteError;
import remoteException.EmptyFile;
import remoteException.UnknownHost;
import tools.BytesArrayTools;
import tools.HostItem;
import protocolP2P.Payload;
@ -207,8 +208,9 @@ public class ProtocolP2PPacketUDP < T extends Payload> extends ProtocolP2PPacket
* @throws InternalError
* @throws SizeError
* @throws IOException
* @throws UnknownHost
*/
public ProtocolP2PPacket<?> receiveResponse() throws EmptyFile, NotFound, NotATracker, EmptyDirectory, InternalRemoteError, VersionRemoteError, ProtocolRemoteError, TransmissionError, ProtocolError, VersionError, InternalError, SizeError, IOException {
public ProtocolP2PPacket<?> receiveResponse() throws EmptyFile, NotFound, NotATracker, EmptyDirectory, InternalRemoteError, VersionRemoteError, ProtocolRemoteError, TransmissionError, ProtocolError, VersionError, InternalError, SizeError, IOException, UnknownHost {
assert requestSocket != null : "Cannot receive response because request packet not sent.";
if (requestSocket == null) {
throw new InternalError();
@ -237,6 +239,8 @@ public class ProtocolP2PPacketUDP < T extends Payload> extends ProtocolP2PPacket
throw new EmptyFile();
case NOT_A_TRACKER:
throw new NotATracker();
case UNKNOWN_HOST:
throw new UnknownHost();
default :
return (ProtocolP2PPacket)p;
}

View File

@ -0,0 +1,177 @@
package serverP2P;
import tools.Logger;
import tools.LogLevel;
import tools.HostItem;
import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
import exception.RemoteException;
import exception.LocalException;
import protocolP2P.RatioRequest;
import protocolP2P.RatioResponse;
import protocolP2P.Payload;
import protocolP2P.ProtocolP2PPacket;
import remoteException.UnknownHost;
import remoteException.NotATracker;
import localException.InternalError;
/** Class allowing to keep the tracker informed about ratios
* @author Louis Royer
* @author Flavien Haas
* @author JS Auge
* @version 1.0
*/
public abstract class RatioWatcher implements Runnable {
final static double punishmentFactor = 1.2;
protected Logger logger;
protected volatile boolean stop;
protected long time;
protected boolean force;
protected HostItem tracker;
protected Thread thread;
protected Map<HostItem, Double> cachePunishmentProbability = new HashMap<>();
protected boolean lock;
/** Constructor
* @param logger Logger
* @param millis Time interval before recheck
* @param tracker HostItem for the tracker
*/
public RatioWatcher(Logger logger, long millis, HostItem tracker) {
assert logger != null : "Logger is null";
assert tracker != null : "Tracker is null";
this.logger = logger;
time = millis;
this.tracker = tracker;
lock = false;
}
/** Runnable implementation */
public void run() {
writeLog("Ratio watcher started : delay " + time + " milliseconds.", LogLevel.Info);
while(!stop) {
try {
clean();
Thread.sleep(time);
} catch(InterruptedException e) {
writeLog("Ratio watcher interrupted", LogLevel.Info);
setStop();
}
}
}
/** Invalidate the cache by cleaning all hashmaps
* @throws InterruptedException
*/
protected synchronized void clean() throws InterruptedException{
while(lock) {
this.wait();
}
lock = true;
cachePunishmentProbability.clear();
lock = false;
this.notifyAll();
}
/** Get Up-ratio for an applications
* @param application HostItem of the application
* @return Punishment Probability
* @throws UnknownHost
*/
protected synchronized double getPunishmentProbability(HostItem application) throws InternalError, UnknownHost {
try {
while(lock) {
this.wait();
}
lock = true;
if (!cachePunishmentProbability.containsKey(application)) {
// update if not in cache
try {
ProtocolP2PPacket<?> p = createProtocolP2PPacket(new RatioRequest(application));
p.sendRequest(getTrackerSocket());
Payload resp = p.receiveResponse().getPayload();
if (!(resp instanceof RatioResponse)) {
throw new InternalError();
}
RatioResponse rresp = (RatioResponse)resp;
if (!rresp.getHostItem().equals(application)) {
writeLog("Ratio response host is not the expected one. Expected : "
+ application + ". Received : " + rresp.getHostItem(), LogLevel.Debug);
throw new InternalError();
}
long up = rresp.getTotalUp();
long down = rresp.getTotalDown();
assert punishmentFactor > 1 : "The punishment factor must be greater than 1";
if (down == 0 || (punishmentFactor * up) >= down) {
cachePunishmentProbability.put(application, Double.valueOf(0));
} else {
cachePunishmentProbability.put(application, Double.valueOf((down - up)/(down * punishmentFactor)));
}
} catch (UnknownHost e) {
throw e;
} catch (IOException e) {
throw new InternalError();
} catch (LocalException e) {
writeLog(e, LogLevel.Error);
throw new InternalError();
} catch (RemoteException e) {
writeLog(e, LogLevel.Error);
throw new InternalError();
}
}
double ret = cachePunishmentProbability.get(application);
lock = false;
this.notifyAll();
return ret;
} catch (InterruptedException e) {
throw new InternalError();
}
}
/** Ask the thread to stop
*/
public void setStop() {
stop = true;
if (thread != null) {
thread.interrupt();
}
}
/** Implementation of writeLog
* @param text Text to log
* @param logLevel level of logging
*/
protected abstract void writeLog(String text, LogLevel logLevel);
/** Implementation of writeLog
* @param e exception to log
* @param logLevel level of logging
*/
protected abstract void writeLog(Exception e, LogLevel logLevel);
/** Set thread
* @param thread Thread
*/
public void setThread(Thread thread) {
this.thread = thread;
}
/** Create packets
* @param payload Payload
*/
protected abstract < T extends Payload > ProtocolP2PPacket<?> createProtocolP2PPacket(T payload);
/** Tracker socket getter
* @return tracker socket
*/
protected abstract Object getTrackerSocket();
/** Closes tracker socket
*/
protected abstract void closeTrackerSocket();
}

View File

@ -0,0 +1,66 @@
package serverP2P;
import tools.Logger;
import tools.LogLevel;
import protocolP2P.ProtocolP2PPacket;
import protocolP2P.ProtocolP2PPacketTCP;
import protocolP2P.Payload;
import tools.HostItem;
import serverP2P.RatioWatcher;
/** Class allowing to keep the tracker informed about file list (TCP impl.)
* @author Louis Royer
* @author Flavien Haas
* @author JS Auge
* @version 1.0
*/
public class RatioWatcherTCP extends RatioWatcher {
/** Constructor
* @param logger Logger
* @param millis Time interval before recheck
* @param tracker HostItem for the tracker
*/
public RatioWatcherTCP(Logger logger, long millis, HostItem tracker) {
super(logger, millis, tracker);
assert logger != null : "Logger is null";
assert tracker != null : "Tracker is null";
}
/** Implementation of writeLog
* @param text Text to log
* @param logLevel level of logging
*/
protected void writeLog(String text, LogLevel logLevel) {
logger.writeTCP(text, logLevel);
}
/** Implementation of writeLog
* @param e exception to log
* @param logLevel level of logging
*/
protected void writeLog(Exception e, LogLevel logLevel) {
logger.writeTCP(e, logLevel);
}
/** Create packets
* @param payload Payload
*/
protected < T extends Payload > ProtocolP2PPacket<T> createProtocolP2PPacket(T payload) {
return (ProtocolP2PPacket<T>)new ProtocolP2PPacketTCP<T>(payload);
}
/** Tracker socket getter
* @return tracker socket
*/
protected Object getTrackerSocket() {
return tracker.getTCPSocket();
}
/** Closes tracker socket
*/
protected void closeTrackerSocket() {
tracker.closeTCPSocket();
}
}

View File

@ -0,0 +1,64 @@
package serverP2P;
import tools.Logger;
import tools.LogLevel;
import protocolP2P.ProtocolP2PPacket;
import protocolP2P.ProtocolP2PPacketUDP;
import protocolP2P.Payload;
import tools.HostItem;
import serverP2P.RatioWatcher;
/** Class allowing to keep the tracker informed about file list (UDP impl.)
* @author Louis Royer
* @author Flavien Haas
* @author JS Auge
* @version 1.0
*/
public class RatioWatcherUDP extends RatioWatcher {
/** Constructor
* @param logger Logger
* @param millis Time interval before recheck
* @param tracker HostItem for the tracker
*/
public RatioWatcherUDP(Logger logger, long millis, HostItem tracker) {
super(logger, millis, tracker);
assert logger != null : "Logger is null";
assert tracker != null : "Tracker is null";
}
/** Implementation of writeLog
* @param text Text to log
* @param logLevel level of logging
*/
protected void writeLog(String text, LogLevel logLevel) {
logger.writeUDP(text, logLevel);
}
/** Implementation of writeLog
* @param e exception to log
* @param logLevel level of logging
*/
protected void writeLog(Exception e, LogLevel logLevel) {
logger.writeUDP(e, logLevel);
}
/** Create packets
* @param payload Payload
*/
protected < T extends Payload > ProtocolP2PPacket<T> createProtocolP2PPacket(T payload) {
return (ProtocolP2PPacket<T>)new ProtocolP2PPacketUDP<T>(payload);
}
/** Tracker socket getter
* @return tracker socket
*/
protected Object getTrackerSocket() {
return tracker.getUDPSocket();
}
/** Closes tracker socket
*/
protected void closeTrackerSocket() {
tracker.closeUDPSocket();
}
}

View File

@ -1,5 +1,6 @@
package serverP2P;
import serverP2P.FileWatcher;
import serverP2P.RatioWatcher;
import tools.Logger;
import tools.LogLevel;
import tools.HostItem;
@ -16,14 +17,18 @@ import protocolP2P.HashAlgorithm;
import protocolP2P.Unregister;
import protocolP2P.SizeRequest;
import protocolP2P.SizeResponse;
import protocolP2P.Denied;
import java.nio.file.Paths;
import java.nio.file.Files;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.Random;
import java.io.IOException;
import exception.LocalException;
import localException.InternalError;
import remoteException.UnknownHost;
/** Implementation of P2P-JAVA-PROJECT VERSION 1.0 protocol.
* @author Louis Royer
@ -39,6 +44,8 @@ public abstract class ServerManagement extends ServeErrors implements Runnable {
protected String baseDirectory;
protected HostItem server;
protected HostItem tracker;
protected Random punisher = new Random();
protected RatioWatcher ratioWatcher;
/** Constructor */
public ServerManagement(String baseDirectory, HostItem server, HostItem tracker, Logger logger) {
@ -58,6 +65,7 @@ public abstract class ServerManagement extends ServeErrors implements Runnable {
public void setStop() {
stop = true;
fileListWatcher.setStop();
ratioWatcher.setStop();
sendUnregisterRequest();
closeSocket();
}
@ -145,27 +153,48 @@ public abstract class ServerManagement extends ServeErrors implements Runnable {
long offset = ((LoadRequest)p).getOffset();
long maxSizePartialContent = ((LoadRequest)p).getMaxSizePartialContent();
try {
byte[] fullLoad = Files.readAllBytes(Paths.get(baseDirectory + filename));
long sizeToSend = 0;
if (fullLoad.length - offset < maxSizePartialContent) {
writeLog("Sending last partialContent", LogLevel.Debug);
sizeToSend = fullLoad.length - offset;
} else {
sizeToSend = maxSizePartialContent;
}
writeLog("maxSizePartialContent: " + maxSizePartialContent, LogLevel.Debug);
writeLog("Sending " + filename + " from " + offset + " to " + (offset + sizeToSend), LogLevel.Debug);
byte[] load = Arrays.copyOfRange(fullLoad, (int)offset, (int)(offset + sizeToSend));
String[] fileList = fileListWatcher.getFileList();
if (Arrays.binarySearch(fileList, filename) >= 0) {
try {
if (load.length == 0) {
sendEmptyFile(pd);
double proba = ratioWatcher.getPunishmentProbability(((LoadRequest)p).getHostItem());
if (punisher.nextDouble() <= proba) {
writeLog("Sending punishment", LogLevel.Debug);
pd.sendResponse(createProtocolP2PPacket(new Denied(filename, offset)));
} else {
pd.sendResponse(createProtocolP2PPacket((Payload)(new FilePart(filename, offset, load))));
byte[] fullLoad = Files.readAllBytes(Paths.get(baseDirectory + filename));
long sizeToSend = 0;
if (fullLoad.length - offset < maxSizePartialContent) {
writeLog("Sending last partialContent", LogLevel.Debug);
sizeToSend = fullLoad.length - offset;
} else {
sizeToSend = maxSizePartialContent;
}
writeLog("maxSizePartialContent: " + maxSizePartialContent, LogLevel.Debug);
writeLog("Sending " + filename + " from " + offset + " to " + (offset + sizeToSend), LogLevel.Debug);
byte[] load = Arrays.copyOfRange(fullLoad, (int)offset, (int)(offset + sizeToSend));
try {
if (load.length == 0) {
sendEmptyFile(pd);
} else {
pd.sendResponse(createProtocolP2PPacket((Payload)(new FilePart(filename, offset, load))));
}
} catch (Exception e2) {
writeLog(e2, LogLevel.Error);
}
}
} catch (Exception e2) {
writeLog(e2, LogLevel.Error);
} catch (InternalError e) {
writeLog("InternalError", LogLevel.Debug);
writeLog(e, LogLevel.Debug);
sendInternalError(pd);
return;
} catch (UnknownHost e) {
writeLog("Unknown host: " + ((LoadRequest)p).getHostItem(), LogLevel.Debug);
writeLog(e, LogLevel.Debug);
sendInternalError(pd);
return;
} catch(LocalException e) {
sendInternalError(pd);
return;
}
} else {
writeLog("File requested not found: `" + filename + "` " + Arrays.binarySearch(fileList, filename), LogLevel.Debug);

View File

@ -82,6 +82,10 @@ public class ServerManagementTCP extends ServerManagement {
Thread flwt = new Thread(fileListWatcher);
flwt.start();
fileListWatcher.setThread(flwt);
ratioWatcher = (RatioWatcher)new RatioWatcherTCP(logger, 10000, tracker);
Thread rwt = new Thread(ratioWatcher);
rwt.start();
ratioWatcher.setThread(rwt);
while(!stop) {
try {
Socket s = socket.accept();

View File

@ -77,6 +77,10 @@ public class ServerManagementUDP extends ServerManagement {
Thread flwt = new Thread(fileListWatcher);
flwt.start();
fileListWatcher.setThread(flwt);
ratioWatcher = (RatioWatcher)new RatioWatcherUDP(logger, 10000, tracker);
Thread rwt = new Thread(ratioWatcher);
rwt.start();
ratioWatcher.setThread(rwt);
while(!stop) {
try {
ProtocolP2PPacketUDP<?> pd = new ProtocolP2PPacketUDP<>((Object)socket);

View File

@ -128,7 +128,7 @@ public class HostItem {
boolean result = false;
if (other instanceof HostItem) {
HostItem that = (HostItem) other;
result = this.getHostname() == that.getHostname() && this.getPort() == that.getPort();
result = this.getHostname().equals(that.getHostname()) && this.getPort() == that.getPort();
}
return result;
}

View File

@ -79,8 +79,14 @@ public abstract class TrackerManagement extends ServeErrors implements Runnable
sendInternalError(pd);
} else {
HostItem host = ((RatioRequest)p).getHostItem();
writeLog("Ratio request for host " + host, LogLevel.Debug);
try {
if (!hostList.contains(host)) {
String l = "";
for (HostItem h: hostList) {
l += h + " ";
}
writeLog(host + " is not in hostlist [ " + l + "]", LogLevel.Debug);
pd.sendResponse(createProtocolP2PPacket(new Payload(RequestResponseCode.UNKNOWN_HOST)));
} else {
pd.sendResponse(createProtocolP2PPacket(new RatioResponse(host,