Fix #27 #64

Closed
louis_royer wants to merge 2 commits from issue27 into etape4

@ -19,8 +19,8 @@ import tools.HostItem;
*/ */
public class ClientP2P { public class ClientP2P {
private String subdir = "seeded/"; private String logDir = "logs/";
private String parts = ".parts"; private String partsDir = ".parts/";
private Logger loggerServer; private Logger loggerServer;
private Logger loggerClient; private Logger loggerClient;
private String host; private String host;
@ -34,11 +34,13 @@ public class ClientP2P {
/** Initialize loggers if directories and logger are null, /** Initialize loggers if directories and logger are null,
* else fail silently. * else fail silently.
*/ */
public void initLogger() { public void initDirectoriesAndLoggers() {
if (directories == null && loggerServer == null && loggerClient == null) { if (directories == null && loggerServer == null && loggerClient == null) {
directories = new Directories("P2P_JAVA_PROJECT_" + port); directories = new Directories("P2P_JAVA_PROJECT_" + port);
loggerServer = new Logger(directories.getDataHomeDirectory() + "server.log"); directories.createSubdir(logDir);
loggerClient = new Logger(directories.getDataHomeDirectory() + "client.log"); loggerServer = new Logger(directories.getDataHomeDirectory() + logDir + "server.log");
loggerClient = new Logger(directories.getDataHomeDirectory() + logDir + "client.log");
directories.createSubdir(partsDir);
} }
} }
@ -53,16 +55,14 @@ public class ClientP2P {
} catch (NumberFormatException e){ } catch (NumberFormatException e){
int oldPort = port; int oldPort = port;
port = defaultPort; port = defaultPort;
initLogger(); initDirectoriesAndLoggers();
System.err.println("Error incorrect port " + oldPort + " using default port " + defaultPort); System.err.println("Error incorrect port " + oldPort + " using default port " + defaultPort);
loggerServer.write("incorrect port " + oldPort + " using default port " + defaultPort, LogLevel.Info); loggerServer.write("incorrect port " + oldPort + " using default port " + defaultPort, LogLevel.Info);
} }
initLogger(); initDirectoriesAndLoggers();
directories.createSubdir(subdir);
directories.createSubdir(parts);
host = "localhost"; host = "localhost";
System.out.println("Server will listen on port " + port + " and serve files from " + directories.getDataHomeDirectory() + subdir); System.out.println("Server will listen on port " + port + " and serve files from " + directories.getDataHomeDirectory());
directories.askOpenDataHomeDirectory(subdir, scanner); directories.askOpenDataHomeDirectory(null, scanner);
System.out.println("Please enter list of servers to use; first one will be used to ask list of files"); System.out.println("Please enter list of servers to use; first one will be used to ask list of files");
} }
@ -84,8 +84,8 @@ public class ClientP2P {
} }
// Server threads // Server threads
ServerManagementUDP smudp = new ServerManagementUDP(c.directories.getDataHomeDirectory() + c.subdir, "localhost", c.port, c.loggerServer, c.tracker); ServerManagementUDP smudp = new ServerManagementUDP(c.directories.getDataHomeDirectory(), "localhost", c.port, c.loggerServer, c.tracker);
ServerManagementTCP smtcp = new ServerManagementTCP(c.directories.getDataHomeDirectory() + c.subdir, "localhost", c.port, c.loggerServer, c.tracker); ServerManagementTCP smtcp = new ServerManagementTCP(c.directories.getDataHomeDirectory(), "localhost", c.port, c.loggerServer, c.tracker);
Thread tudp = new Thread(smudp); Thread tudp = new Thread(smudp);
tudp.setName("server UDP P2P-JAVA-PROJECT (port: " + c.port + ")"); tudp.setName("server UDP P2P-JAVA-PROJECT (port: " + c.port + ")");
tudp.start(); tudp.start();
@ -96,7 +96,7 @@ public class ClientP2P {
// Wait a bit before printing client interface // Wait a bit before printing client interface
// This is not required, but allow to have a cleaner interface // This is not required, but allow to have a cleaner interface
try { try {
Thread.sleep(100); Thread.sleep(200);
} catch(InterruptedException e) { } catch(InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
@ -112,7 +112,7 @@ public class ClientP2P {
case "upd": // alias typo case "upd": // alias typo
case "2" : case "2" :
System.out.println("Starting with UDP"); System.out.println("Starting with UDP");
ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.tracker, c.directories.getDataHomeDirectory() + c.parts + "/", c.loggerClient, c.scanner); ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.tracker, c.directories.getDataHomeDirectory() + c.partsDir, c.loggerClient, c.scanner);
t = new Thread(cmudp); t = new Thread(cmudp);
break; break;
case "TCP": case "TCP":
@ -120,7 +120,7 @@ public class ClientP2P {
case "1": case "1":
default: default:
System.out.println("Starting with TCP"); System.out.println("Starting with TCP");
ClientManagementTCP cmtcp = new ClientManagementTCP(c.directories.getDataHomeDirectory(), c.tracker, c.directories.getDataHomeDirectory() + c.parts + "/", c.loggerClient, c.scanner); ClientManagementTCP cmtcp = new ClientManagementTCP(c.directories.getDataHomeDirectory(), c.tracker, c.directories.getDataHomeDirectory() + c.partsDir, c.loggerClient, c.scanner);
t = new Thread(cmtcp); t = new Thread(cmtcp);
break; break;
} }

@ -1,5 +1,5 @@
package exception; package exception;
public class LocalException extends Exception { public abstract class LocalException extends Exception {
private static final long serialVersionUID = 12L; private static final long serialVersionUID = 12L;
} }

@ -1,5 +1,5 @@
package exception; package exception;
public class RemoteException extends Exception { public abstract class RemoteException extends Exception {
private static final long serialVersionUID = 12L; private static final long serialVersionUID = 12L;
} }

@ -51,6 +51,8 @@ public class ServerManagementTCP implements Runnable {
private Logger logger; private Logger logger;
private HostItem tracker; private HostItem tracker;
private HostItem server; private HostItem server;
private FileListWatcher fileListWatcher;
private volatile boolean stop;
/** Constructor for TCP implementation, with baseDirectory and TCPPort parameters. /** Constructor for TCP implementation, with baseDirectory and TCPPort parameters.
* @param baseDirectory the root directory where files are stored * @param baseDirectory the root directory where files are stored
@ -60,12 +62,12 @@ public class ServerManagementTCP implements Runnable {
* @param tracker Tracker * @param tracker Tracker
*/ */
public ServerManagementTCP(String baseDirectory, String hostName, int port, Logger logger, HostItem tracker) { public ServerManagementTCP(String baseDirectory, String hostName, int port, Logger logger, HostItem tracker) {
stop = false;
server = new HostItem(hostName, port); server = new HostItem(hostName, port);
fileList = new String[0];
this.tracker = tracker; this.tracker = tracker;
this.logger = logger; this.logger = logger;
this.baseDirectory = baseDirectory; this.baseDirectory = baseDirectory;
initFileList();
initSha512();
try { try {
socket = new ServerSocket(server.getPort(), 10, server.getInetAddress()); socket = new ServerSocket(server.getPort(), 10, server.getInetAddress());
} catch (SocketException e) { } catch (SocketException e) {
@ -77,12 +79,26 @@ public class ServerManagementTCP implements Runnable {
} }
} }
/** Stop the thread */
public void setStop() {
stop = true;
}
/** Trigger a manual check of the file list
*/
public void updateFileList() {
if (fileListWatcher != null) {
fileListWatcher.trigger();
}
}
/** Implementation of runnable. This methods allows to run the server. /** Implementation of runnable. This methods allows to run the server.
*/ */
public void run() { public void run() {
logger.writeTCP("Server sucessfully started", LogLevel.Info); logger.writeTCP("Server sucessfully started", LogLevel.Info);
(new Thread(new TrackerRegisterer())).start(); fileListWatcher = new FileListWatcher(10000); // checking every 10 seconds
do { (new Thread(fileListWatcher)).start();
while(!stop) {
try { try {
Socket s = socket.accept(); Socket s = socket.accept();
ClientHandler c = new ClientHandler(s); ClientHandler c = new ClientHandler(s);
@ -90,7 +106,17 @@ public class ServerManagementTCP implements Runnable {
} catch (IOException e) { } catch (IOException e) {
logger.writeTCP("Error while accepting new connection", LogLevel.Warning); logger.writeTCP("Error while accepting new connection", LogLevel.Warning);
} }
} while(true); }
fileListWatcher.setStop();
// unregistering from tracker
try {
logger.writeTCP("Unregistering from tracker", LogLevel.Info);
ProtocolP2PPacket p = (ProtocolP2PPacket)new ProtocolP2PPacketTCP((Payload)new Unregister(server));
p.sendRequest((Object)tracker.getTCPSocket());
} catch (Exception e) {
logger.writeTCP("Cannot unregister from tracker", LogLevel.Error);
logger.writeTCP(e, LogLevel.Error);
}
} }
/** Private runnable class allowing to serve one client. /** Private runnable class allowing to serve one client.
@ -167,22 +193,6 @@ public class ServerManagementTCP implements Runnable {
} }
} }
/** Initialize local list of all files allowed to be shared.
*/
private void initFileList() {
File folder = new File(baseDirectory);
Vector<String> v = new Vector<String>();
File[] files = folder.listFiles();
/* Add non-recursively files's names to fileList */
for (File f : files) {
if (f.isFile()) {
v.add(f.getName());
}
}
fileList = new String[v.size()];
v.toArray(fileList);
Arrays.sort(fileList);
}
/** Init sha512 map. /** Init sha512 map.
*/ */
@ -333,39 +343,89 @@ public class ServerManagementTCP implements Runnable {
} }
} }
/** Private runnable class allowing to initialize tracker while initializing server. */ /** Private runnable class allowing to keep the tracker informed about file list. */
private class TrackerRegisterer implements Runnable { private class FileListWatcher implements Runnable {
private volatile boolean stop;
/** Runnable implementation */ private long time;
public void run() { private boolean force;
/** Register server on tracker
*/
private void registerTracker() {
try { try {
registerTracker(); logger.writeTCP("Trying to into tracker", LogLevel.Info);
ProtocolP2PPacket p = (ProtocolP2PPacket)new ProtocolP2PPacketTCP((Payload)new Register(server));
p.sendRequest((Object)tracker.tryGetTCPSocket());
logger.writeTCP("Register request sent.", LogLevel.Debug);
tracker.closeTCPSocket();
} catch (Exception e) { } catch (Exception e) {
logger.writeTCP(e, LogLevel.Error); // error, trying again at next iteration
System.exit(-4); force = true;
logger.writeTCP("Cannot contact tracker, trying again at next iteration (" + time + " milliseconds).", LogLevel.Error);
} }
} }
/** Register server on tracker
* @throws InternalError /** Update fileList and returns true if different than old list.
* @throws IOException * @return true if changed
* @throws SocketClosed
*/ */
private void registerTracker() throws InternalError, IOException, SocketClosed { private boolean updateFileList() {
logger.writeTCP("Unregistering from tracker", LogLevel.Info); File folder = new File(baseDirectory);
ProtocolP2PPacket p = (ProtocolP2PPacket)new ProtocolP2PPacketTCP((Payload)new Unregister(server)); Vector<String> v = new Vector<String>();
p.sendRequest((Object)tracker.getTCPSocket()); File[] files = folder.listFiles();
// FIXME: this is a hack /* Add non-recursively files's names to fileList */
// ProtocolP2PPacketTCP reads 1024 bytes but if 2 request comes at the same time InputStream is cleared fully for (File f : files) {
// and we keep waiting forever on the other side if (f.isFile()) {
// a fix could be to read only the header first, and read the required size after v.add(f.getName());
try { }
Thread.sleep(100); }
} catch (InterruptedException e) {} String[] newFileList = new String[v.size()];
logger.writeTCP("Registering into tracker", LogLevel.Info); v.toArray(newFileList);
p = (ProtocolP2PPacket)new ProtocolP2PPacketTCP((Payload)new Register(server)); Arrays.sort(newFileList);
p.sendRequest((Object)tracker.getTCPSocket()); if (!Arrays.equals(newFileList, fileList)) {
logger.writeTCP("Registering completed", LogLevel.Debug); fileList = newFileList;
tracker.closeTCPSocket(); initSha512();
return true;
} else {
return false;
}
}
/** Constructor with millis parameter
* @param millis interval of time between checks
*/
public FileListWatcher(long millis) {
this.stop = false;
this.time = millis;
}
/** Ask the thread to stop
*/
public void setStop() {
stop = true;
}
/** Allow a manual check
*/
public void trigger() {
if (updateFileList() || force) {
force = false;
logger.writeTCP("File list watcher detected changes. Informing tracker.", LogLevel.Info);
registerTracker();
}
}
/** Runnable implementation */
public void run() {
logger.writeTCP("File list watcher started : delay " + time + " milliseconds.", LogLevel.Info);
while(!stop) {
trigger();
try {
Thread.sleep(time);
} catch(InterruptedException e) {
logger.writeTCP("File list watcher interrupted", LogLevel.Error);
setStop();
}
}
} }
} }

@ -43,13 +43,15 @@ import java.net.UnknownHostException;
*/ */
public class ServerManagementUDP implements Runnable { public class ServerManagementUDP implements Runnable {
private String[] fileList; private String[] fileList = new String[0];
private Map<String, byte[]> sha512 = new HashMap<>(); private Map<String, byte[]> sha512 = new HashMap<>();
private String baseDirectory; private String baseDirectory;
private DatagramSocket socket; private DatagramSocket socket;
private Logger logger; private Logger logger;
private HostItem tracker; private HostItem tracker;
private HostItem server; private HostItem server;
private FileListWatcher fileListWatcher;
private volatile boolean stop;
/** Constructor for UDP implementation, with baseDirectory and UDPPort parameters. /** Constructor for UDP implementation, with baseDirectory and UDPPort parameters.
* @param baseDirectory the root directory where files are stored * @param baseDirectory the root directory where files are stored
@ -59,12 +61,11 @@ public class ServerManagementUDP implements Runnable {
* @param tracker Tracker * @param tracker Tracker
*/ */
public ServerManagementUDP(String baseDirectory, String hostName, int port, Logger logger, HostItem tracker) { public ServerManagementUDP(String baseDirectory, String hostName, int port, Logger logger, HostItem tracker) {
stop = false;
server = new HostItem(hostName, port); server = new HostItem(hostName, port);
this.logger = logger; this.logger = logger;
this.baseDirectory = baseDirectory; this.baseDirectory = baseDirectory;
this.tracker = tracker; this.tracker = tracker;
initFileList();
initSha512();
try { try {
socket = new DatagramSocket(server.getPort(), server.getInetAddress()); socket = new DatagramSocket(server.getPort(), server.getInetAddress());
} catch (SocketException e) { } catch (SocketException e) {
@ -73,12 +74,26 @@ public class ServerManagementUDP implements Runnable {
} }
} }
/** Stop the thread */
public void setStop() {
stop = true;
}
/** Trigger a manual check of the file list
*/
public void updateFileList() {
if (fileListWatcher != null) {
fileListWatcher.trigger();
}
}
/** Implementation of runnable. This methods allows to run the server. /** Implementation of runnable. This methods allows to run the server.
*/ */
public void run() { public void run() {
logger.writeUDP("Server sucessfully started", LogLevel.Info); logger.writeUDP("Server sucessfully started", LogLevel.Info);
(new Thread(new TrackerRegisterer())).start(); fileListWatcher = new FileListWatcher(10000); // checking every 10 seconds
while(true) { (new Thread(fileListWatcher)).start();
while(!stop) {
try { try {
ProtocolP2PPacketUDP pd = new ProtocolP2PPacketUDP((Object)socket); ProtocolP2PPacketUDP pd = new ProtocolP2PPacketUDP((Object)socket);
Payload p = pd.getPayload(); Payload p = pd.getPayload();
@ -111,23 +126,15 @@ public class ServerManagementUDP implements Runnable {
} catch (SizeError e) { } catch (SizeError e) {
} }
} }
} fileListWatcher.setStop();
// unregistering from tracker
/** Initialize local list of all files allowed to be shared. try {
*/ logger.writeUDP("Unregistering from tracker", LogLevel.Info);
private void initFileList() { ProtocolP2PPacket p = (ProtocolP2PPacket)new ProtocolP2PPacketUDP((Payload)new Unregister(server));
File folder = new File(baseDirectory); p.sendRequest((Object)tracker.getUDPSocket());
Vector<String> v = new Vector<String>(); } catch (Exception e) {
File[] files = folder.listFiles(); logger.writeUDP(e, LogLevel.Error);
/* Add non-recursively files's names to fileList */
for (File f : files) {
if (f.isFile()) {
v.add(f.getName());
}
} }
fileList = new String[v.size()];
v.toArray(fileList);
Arrays.sort(fileList);
} }
/** Init sha512 map. /** Init sha512 map.
@ -280,32 +287,89 @@ public class ServerManagementUDP implements Runnable {
} }
} }
/** Private runnable class allowing to initialize tracker while initializing server. */ /** Private runnable class allowing to keep the tracker informed about file list. */
private class TrackerRegisterer implements Runnable { private class FileListWatcher implements Runnable {
private volatile boolean stop;
/** Runnable implementation */ private long time;
public void run() { private boolean force;
/** Register server on tracker
*/
private void registerTracker() {
try { try {
registerTracker(); logger.writeUDP("Trying to register into tracker", LogLevel.Info);
ProtocolP2PPacket p = (ProtocolP2PPacket)new ProtocolP2PPacketUDP((Payload)new Register(server));
p.sendRequest((Object)tracker.getUDPSocket());
logger.writeUDP("Register request sent (but cannot ensure reception).", LogLevel.Debug);
tracker.closeUDPSocket();
} catch (Exception e) { } catch (Exception e) {
logger.writeUDP(e, LogLevel.Error); force = true;
System.exit(-4); logger.writeUDP("Cannot contact tracker, trying again at next iteration (" + time + " milliseconds).", LogLevel.Error);
} }
} }
/** Register server on tracker
* @throws InternalError /** Update fileList and returns true if different than old list.
* @throws IOException * @return true if changed
* @throws SocketClosed
*/ */
private void registerTracker() throws InternalError, IOException, SocketClosed { private boolean updateFileList() {
logger.writeUDP("Unregistering from tracker", LogLevel.Info); File folder = new File(baseDirectory);
ProtocolP2PPacket p = (ProtocolP2PPacket)new ProtocolP2PPacketUDP((Payload)new Unregister(server)); Vector<String> v = new Vector<String>();
p.sendRequest((Object)tracker.getUDPSocket()); File[] files = folder.listFiles();
logger.writeUDP("Registering into tracker", LogLevel.Info); /* Add non-recursively files's names to fileList */
p = (ProtocolP2PPacket)new ProtocolP2PPacketUDP((Payload)new Register(server)); for (File f : files) {
p.sendRequest((Object)tracker.getUDPSocket()); if (f.isFile()) {
logger.writeUDP("Registering completed", LogLevel.Debug); v.add(f.getName());
tracker.closeUDPSocket(); }
}
String[] newFileList = new String[v.size()];
v.toArray(newFileList);
Arrays.sort(newFileList);
if (!Arrays.equals(newFileList, fileList)) {
fileList = newFileList;
initSha512();
return true;
} else {
return false;
}
}
/** Constructor with millis parameter
* @param millis interval of time between checks
*/
public FileListWatcher(long millis) {
this.force = true;
this.stop = false;
this.time = millis;
}
/** Ask the thread to stop
*/
public void setStop() {
stop = true;
}
/** Allow a manual check
*/
public void trigger() {
if (updateFileList() || force) {
force = false;
logger.writeUDP("File list watcher detected changes. Informing tracker.", LogLevel.Info);
registerTracker();
}
}
/** Runnable implementation */
public void run() {
logger.writeUDP("File list watcher started : delay " + time + " milliseconds.", LogLevel.Info);
while(!stop) {
trigger();
try {
Thread.sleep(time);
} catch(InterruptedException e) {
logger.writeUDP("File list watcher interrupted", LogLevel.Error);
setStop();
}
}
} }
} }
} }

@ -49,6 +49,19 @@ public class HostItem {
return tcpSocket; return tcpSocket;
} }
/** Get TCP Socket.
* @return TCP Socket
* @throws SocketException
* @throws UnknownHostException
* @throws IOException
*/
public Socket tryGetTCPSocket() throws SocketException, UnknownHostException, IOException {
if (tcpSocket == null) {
tcpSocket = new Socket(InetAddress.getByName(hostname), port);
}
return tcpSocket;
}
/** Closes tcp socket /** Closes tcp socket
*/ */
public void closeTCPSocket() { public void closeTCPSocket() {

@ -75,9 +75,11 @@ public class TrackerManagementUDP implements Runnable {
sendNotFound(pd); sendNotFound(pd);
break; break;
case REGISTER: case REGISTER:
logger.writeUDP("Received REGISTER from host " + pd.getHostItem(), LogLevel.Debug);
handleRegister(pd); handleRegister(pd);
break; break;
case UNREGISTER: case UNREGISTER:
logger.writeUDP("Received UNREGISTER from host " + pd.getHostItem(), LogLevel.Debug);
handleUnregister(pd); handleUnregister(pd);
break; break;
case DISCOVER_REQUEST: case DISCOVER_REQUEST:

Loading…
Cancel
Save