From 733767c4de59d13ad6141aaa7089a1b059a1011a Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 12 Mar 2020 17:52:31 +0100 Subject: [PATCH 1/5] Add mulithreaded UDP client --- src/clientP2P/ClientDownloadPartUDP.java | 297 ++++++++++++++++++ src/clientP2P/ClientDownloadUDP.java | 372 +++++++++++++++++++++++ src/clientP2P/ClientManagementUDP.java | 121 ++------ src/clientP2P/ClientP2P.java | 2 +- 4 files changed, 695 insertions(+), 97 deletions(-) create mode 100644 src/clientP2P/ClientDownloadPartUDP.java create mode 100644 src/clientP2P/ClientDownloadUDP.java diff --git a/src/clientP2P/ClientDownloadPartUDP.java b/src/clientP2P/ClientDownloadPartUDP.java new file mode 100644 index 0000000..4ec0391 --- /dev/null +++ b/src/clientP2P/ClientDownloadPartUDP.java @@ -0,0 +1,297 @@ +package clientP2P; +import java.util.List; +import java.util.ArrayList; +import java.net.DatagramSocket; +import protocolP2P.ProtocolP2PPacketUDP; +import protocolP2P.Payload; +import protocolP2P.LoadRequest; +import protocolP2P.FilePart; +import exception.InternalError; +import remoteException.EmptyDirectory; +import remoteException.EmptyFile; +import exception.ProtocolError; +import remoteException.InternalRemoteError; +import remoteException.VersionRemoteError; +import exception.TransmissionError; +import remoteException.ProtocolRemoteError; +import exception.VersionError; +import exception.SizeError; +import remoteException.NotFound; +import java.nio.file.Files; +import java.io.File; +import java.nio.file.Paths; +import java.io.IOException; + +/** Class to download file parts on udp. +* @author Louis Royer +* @author Flavien Haas +* @author JS Auge +* @version 1.0 +*/ +public class ClientDownloadPartUDP implements Runnable { + + private List toDoTasks; + private List pendingTasks; + private List tasksDone; + private volatile boolean tasksListsLock; + private volatile boolean stop; + private volatile boolean failed; + private String filename; + private DatagramSocket socket; + private volatile boolean noTask; + private String partsSubdir; + private static final long MAX_PARTIAL_SIZE = 4096; + + /** Constructor with filename, socket, and part subdir + * @param filename name of file to download + * @param socket socket to use + * @param partsSubdir directory to store .part files + */ + public ClientDownloadPartUDP(String filename, DatagramSocket socket, String partsSubdir) { + this.partsSubdir = partsSubdir; + this.filename = filename; + this.socket = socket; + stop = false; + failed = false; + pendingTasks = new ArrayList<>(); + toDoTasks = new ArrayList<>(); + tasksDone = new ArrayList<>(); + noTask = true; + tasksListsLock = false; + } + + /** True if thread has failed to get a file. + * @return true if thread has failed to get a file + */ + public boolean hasFailed() { + return failed; + } + + /** Asks to stop thread. + * @throws InterruptedException + */ + public synchronized void setStop() throws InterruptedException { + stop = true; + this.notifyAll(); + } + + /** Runnable implementation */ + public void run() { + while(!stop) { + try { + doTasks(); + } catch(InterruptedException e) { + try { + setStop(); + } catch (InterruptedException e2) { + } + } + } + System.err.println("Closing socket"); + socket.close(); + } + + /** Get list of offsets that have not be downloaded if failed, else + * empty list. + * @return list of offsets + */ + public List getFailed() { + List ret = new ArrayList<>(); + if (failed) { + ret.addAll(pendingTasks); + ret.addAll(toDoTasks); + } + return ret; + } + + /** Get list of downloaded file parts offset, then clear this list. + * @return list of offsets + * @throws InterruptedException + */ + public List getDone() throws InterruptedException { + if (tasksDone.size() == 0) { + return new ArrayList<>(); + } else { + synchronized (this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + List ret = new ArrayList<>(tasksDone); + tasksDone.clear(); + tasksListsLock = false; + this.notifyAll(); + return ret; + } + } + } + + /** Adds offset of files parts to download. + * @param task offset to download + * @throws InterruptedException + */ + public synchronized void assignTask(Long task) throws InterruptedException { + synchronized(this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + toDoTasks.add(task); + noTask = false; + tasksListsLock = false; + this.notifyAll(); + } + } + + /** Send one request and wait for one response. Blocks when no task. + * @throws InterruptedException + */ + public synchronized void doTasks() throws InterruptedException { + while(noTask && !stop) { + this.wait(); + } + if (!stop) { + try { + Long offset = toDoTasks.get(0); + ProtocolP2PPacketUDP p = reqPart(offset); + if (p == null) { + stop = true; + } + + failed = downloadPart(p); + if (failed) { + System.err.println("Error: DownloadPart failed."); + stop = true; + } else if (toDoTasks.isEmpty()) { + noTask = true; + } + } catch (IndexOutOfBoundsException e) { + noTask = true; + } + } + } + + public ProtocolP2PPacketUDP reqPart(Long offset) { + System.err.println("New request: "+ offset); + // maintain tracking of tasks + if (toDoTasks.contains(offset)) { + try { + synchronized (this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + toDoTasks.remove(offset); + pendingTasks.add(offset); + tasksListsLock = false; + this.notifyAll(); + } + } catch(InterruptedException e) { + System.err.println("Error: reqPart interruptedException"); + return null; + } + } else { + System.err.println("Error: reqPart (offset " + offset + " not in toDoTasks)"); + return null; + } + // send request + try { + ProtocolP2PPacketUDP d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, offset.longValue(), MAX_PARTIAL_SIZE)); + d.sendRequest((Object)socket); + return d; + } catch (InternalError e) { + System.err.println("Error: reqPart internalError"); + return null; + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Error: reqPart ioexception"); + return null; + } + } + + public boolean downloadPart(ProtocolP2PPacketUDP d) { + if (d == null) { + System.err.println("Error: downloadPart -> d is null."); + return true; + } + try { + Payload p = d.receiveResponse().getPayload(); + assert p instanceof FilePart : "This payload must be instance of FilePart"; + if (!(p instanceof FilePart)) { + System.err.println("Error: cannot get size."); + return true; + } else { + FilePart fp = (FilePart)p; + if (!fp.getFilename().equals(filename)) { + System.err.println("Error: wrong file received: `" + fp.getFilename() + "`"); + return true; + } + Long offset = Long.valueOf(fp.getOffset()); + if (pendingTasks.contains(offset)) { + try { + Files.write(new File(partsSubdir + filename + "_" + offset + ".part").toPath(), fp.getPartialContent()); + } catch (IOException e) { + System.err.println("Error: cannot write file (" + partsSubdir + filename + "_" + offset + ".part)"); + } + } else { + System.err.println("Error: wrong file part received."); + return true; + } + try { + synchronized(this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + pendingTasks.remove(offset); + tasksDone.add(offset); + tasksListsLock = false; + this.notifyAll(); + } + } catch(InterruptedException e) { + System.err.println("Error: DownloadPart Interrupted exception"); + return true; + } + } + } catch (EmptyDirectory e) { + System.err.println("Error: empty directory."); + return true; + } catch (EmptyFile e) { + System.err.println("Error: downloadPart emptyFile"); + // TODO: use more specific errors + return true; + } catch (ProtocolError e) { + System.err.println("Error: downloadPart protocolError"); + return true; + } catch (InternalRemoteError e) { + System.err.println("Error: downloadPart internalRemoteError"); + return true; + } catch (VersionRemoteError e) { + System.err.println("Error: downloadPart versionRemoteError"); + return true; + } catch (ProtocolRemoteError e) { + System.err.println("Error: downloadPart protocolRemoteError"); + return true; + } catch (TransmissionError e) { + System.err.println("Error: downloadPart transmissionError"); + return true; + } catch (VersionError e) { + System.err.println("Error: downloadPart versionError"); + return true; + } catch (SizeError e) { + System.err.println("Error: downloadPart sizeError"); + return true; + } catch (NotFound e) { + System.err.println("Error: downloadPart notFound"); + return true; + } catch (IOException e) { + System.err.println("Error: downloadPart ioexception"); + return true; + } catch (InternalError e) { + System.err.println("Error: downloadPart internalError"); + return true; + } + return false; + } + +} diff --git a/src/clientP2P/ClientDownloadUDP.java b/src/clientP2P/ClientDownloadUDP.java new file mode 100644 index 0000000..134ab2f --- /dev/null +++ b/src/clientP2P/ClientDownloadUDP.java @@ -0,0 +1,372 @@ +package clientP2P; +import clientP2P.ClientDownloadPartUDP; +import tools.HostItem; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import remoteException.EmptyDirectory; +import remoteException.EmptyFile; +import remoteException.VersionRemoteError; +import remoteException.ProtocolRemoteError; +import remoteException.NotFound; +import remoteException.InternalRemoteError; +import protocolP2P.HashAlgorithm; +import protocolP2P.HashResponse; +import protocolP2P.HashRequest; +import protocolP2P.ProtocolP2PPacketUDP; +import protocolP2P.Payload; +import exception.ProtocolError; +import exception.InternalError; +import exception.TransmissionError; +import exception.SizeError; +import exception.VersionError; +import protocolP2P.FilePart; +import protocolP2P.LoadRequest; +import java.io.IOException; +import java.nio.file.Files; +import java.io.File; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; + +/** Class to download file from udp +* @author Louis Royer +* @author Flavien Haas +* @author JS Auge +* @version 1.0 +*/ +public class ClientDownloadUDP implements Runnable { + + private List hostList; + private String filename; + private byte[] hash512; + private List sockList = new ArrayList(); + private List offsetsToAsk = new ArrayList(); + private List offsetsPending = new ArrayList(); + private boolean stop; + private long size; + private static final long MAX_PARTIAL_SIZE = 4096; + private String partsSubdir; + private String dirStorage; + private boolean success = false; + + /** Constructor with parameters: filename, list of hosts, parts subdirectory and dirStorage + * @param filename name of file to download + * @param hostList list of servers + * @param partsSubdir directory to store .part files + * @param dirStorage directory to write assembled file + */ + public ClientDownloadUDP(String filename, List hostList, String partsSubdir, String dirStorage) { + this.partsSubdir = partsSubdir; + this.dirStorage = dirStorage; + this.filename = filename; + this.hostList = hostList; + this.stop = false; + } + + /** Asks thread to stop + */ + public void setStop() { + stop = true; + } + + /** Runnable implementation + */ + public void run() { + try { + init(); + if (stop) { + System.err.println("File is smaller than part max size."); + hostList.get(0).closeUDPSocket(); + } else { + System.err.println("File is bigger than part max size."); + purgeList(); + initThreads(); + while(!stop) { + checkTasksStatus(); + assignTasks(); + } + } + System.err.println("Reassembling file parts."); + reassembleFile(); + } catch(InternalError e) { + System.err.println("Error while downloading file. Aborting."); + } finally { + stopTasks(); + } + } + + /** Starts threads for each server in hostList. + */ + private void initThreads() { + for(HostItem hostItem: hostList) { + sockList.add(new ClientDownloadPartUDP(filename, hostItem.getUDPSocket(), partsSubdir)); + } + for(ClientDownloadPartUDP c: sockList) { + Thread t = new Thread(c); + t.start(); + } + System.err.println("Threads initialized"); + } + + /** Remove tasks from failed threads. Update done status. + * @throws InternalError + */ + private void checkTasksStatus() throws InternalError { + List sockListCpy = new ArrayList<>(sockList); + for(ClientDownloadPartUDP c: sockListCpy) { + if (c.hasFailed() == true) { + sockList.remove(c); + offsetsPending.removeAll(c.getFailed()); + offsetsToAsk.addAll(c.getFailed()); + } + try { + offsetsPending.removeAll(c.getDone()); + } catch (InterruptedException e) { + throw new InternalError(); + } + } + System.err.println("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending"); + if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { + stop = true; + } + if (sockList.size() == 0) { + System.err.println("No thread working"); + throw new InternalError(); + } + } + + /** Assign tasks randomly to threads. + * @throws InternalError + */ + private void assignTasks() throws InternalError { + Random rand = new Random(); + for(long offset : offsetsToAsk) { + try { + sockList.get(rand.nextInt(sockList.size())).assignTask(offset); + offsetsPending.add(offset); + System.err.println("Assigned task "+ offset); + } catch(InterruptedException e) { + throw new InternalError(); + } + } + offsetsToAsk.removeAll(offsetsPending); + } + + /** Stop threads */ + private void stopTasks() { + for(ClientDownloadPartUDP c : sockList) { + try { + c.setStop(); + } catch (InterruptedException e) {} + } + } + + /** Get hashsum from server. + * @param hostItem server to ask hash + * @return hash512sum + * @throws InternalError + */ + private byte[] getHashSum512(HostItem hostItem) throws InternalError { + byte[] hash; + HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; + hashesAlgo[0] = HashAlgorithm.SHA512; + ProtocolP2PPacketUDP d = new ProtocolP2PPacketUDP((Payload) new HashRequest(filename, hashesAlgo)); + try { + d.sendRequest((Object)hostItem.getUDPSocket()); + try { + Payload pHash = d.receiveResponse().getPayload(); + assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; + if (!(pHash instanceof HashResponse)) { + throw new InternalError(); + } else { + hash = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); + } + } catch (EmptyDirectory e) { + hash = new byte[0]; + } catch (NotFound e) { + hash = new byte[0]; + // TODO: use more specific errors + } catch (EmptyFile e) { + throw new InternalError(); + } catch (ProtocolError e) { + throw new InternalError(); + } catch (InternalRemoteError e) { + throw new InternalError(); + } catch (VersionRemoteError e) { + throw new InternalError(); + } catch (ProtocolRemoteError e) { + throw new InternalError(); + } catch (TransmissionError e) { + throw new InternalError(); + } catch (VersionError e) { + throw new InternalError(); + } catch (SizeError e) { + throw new InternalError(); + } + return hash; + } catch (IOException e) { + throw new InternalError(); + } + } + + /** Removes servers not owning the correct file to download from list. + * This is done by comparing hash512sum. + * @throws InternalError + */ + private void purgeList() throws InternalError { + List blackList = new ArrayList(); + boolean first = false; + byte[] hashsum; + for(HostItem host: hostList) { + // already have hashsum from 1st server + if (!first) { + first = true; + continue; + } + // ask hashsum + hashsum = getHashSum512(host); + if (!Arrays.equals(hash512, hashsum)) { + blackList.add(host); + } + } + // purge list + for(HostItem host: blackList) { + hostList.remove(host); + } + System.err.println("Host list purge: done"); + } + + /** Getter for hash512sum + * @return hash512sum + */ + public byte[] getHashSum512() { + return hash512; + } + + /** Initialize infos about file to download (size, hash512sum, partslist to dl). + * Also download first partfile (to get size). + * @throws InternalError + */ + private void init() throws InternalError { + // get size + setSize(); + + // get hashsum from 1st server in list + hash512 = getHashSum512(hostList.get(0)); + if (hash512.length == 0) { + System.err.println("Error: no hash512sum support."); + throw new InternalError(); + } + + // Add tasks + if (!stop) { + for(long i=MAX_PARTIAL_SIZE; i= size) { + success = true; + System.err.println("Reassembling: success"); + } else { + // append to file + try { + Files.write(new File(dirStorage + filename).toPath(), Files.readAllBytes(new File(partsSubdir + filename + "_" + nextOffset + ".part").toPath()), StandardOpenOption.APPEND); + nextOffset = (new File(dirStorage + filename)).length(); + } catch (IOException e) { + abort = true; + System.err.println("Aborting: bad number " + nextOffset); + } + } + } while((!success) && (!abort)); + } +} diff --git a/src/clientP2P/ClientManagementUDP.java b/src/clientP2P/ClientManagementUDP.java index 095e8ed..2e7c045 100644 --- a/src/clientP2P/ClientManagementUDP.java +++ b/src/clientP2P/ClientManagementUDP.java @@ -34,6 +34,7 @@ import protocolP2P.HashRequest; import protocolP2P.HashResponse; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import clientP2P.ClientDownloadUDP; /** Implementation of P2P-JAVA-PROJECT CLIENT * @author Louis Royer @@ -43,6 +44,7 @@ import java.security.NoSuchAlgorithmException; */ public class ClientManagementUDP implements Runnable { private String baseDirectory; + private String partsSubdir; private List hostList; /** Constructor for UDP implementation, with baseDirectory and UDPPort parameters. @@ -50,9 +52,10 @@ public class ClientManagementUDP implements Runnable { * @param host hostname of the server * @param UDPPort the server will listen on this port */ - public ClientManagementUDP(String baseDirectory, List hostList) { + public ClientManagementUDP(String baseDirectory, List hostList, String partsSubdir) { this.baseDirectory = baseDirectory; this.hostList = hostList; + this.partsSubdir = partsSubdir; } /** Implementation of Runnable @@ -115,106 +118,32 @@ public class ClientManagementUDP implements Runnable { * @throws EmptyFile */ private void download(String filename) throws EmptyFile, NotFound, InternalError, UnknownHostException, IOException, TransmissionError, ProtocolError, VersionError, SizeError, InternalRemoteError, ProtocolRemoteError, VersionRemoteError { - byte[] hash512; - final long MAX_PARTIAL_SIZE = 4096; - HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; - hashesAlgo[0] = HashAlgorithm.SHA512; - ProtocolP2PPacketUDP d = new ProtocolP2PPacketUDP((Payload) new HashRequest(filename, hashesAlgo)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); + ClientDownloadUDP downLoader = new ClientDownloadUDP(filename, hostList, partsSubdir, baseDirectory); + Thread t = new Thread(downLoader); + t.start(); try { - Payload pHash = d.receiveResponse().getPayload(); - assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; - if (!(pHash instanceof HashResponse)) { - throw new InternalError(); - } else { - hash512 = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); - if (hash512.length == 0) { - System.err.println("Error: server do not support sha512 hashes"); - throw new InternalError(); - } - } - } catch (EmptyDirectory e) { - System.err.println("Error: empty directory, but request was not LIST_REQUEST"); - throw new ProtocolError(); - } - - d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, 0, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); - boolean fileFullyWritten = false; - long offset = 0; - do { - try { - Payload p = d.receiveResponse().getPayload(); - assert p instanceof FilePart : "This payload must be instance of FilePart"; - if (!(p instanceof FilePart)) { - throw new InternalError(); - } else { - FilePart fp = (FilePart)p; - if (!fp.getFilename().equals(filename)) { - System.err.println("Error: wrong file received: `" + fp.getFilename() + "`"); - throw new ProtocolError(); + t.join(); + if (downLoader.getSuccess()) { + byte[] hash512 = downLoader.getHashSum512(); + if (!Arrays.equals(hash512, computeHashsum(filename, HashAlgorithm.SHA512))) { + System.err.println("Error: Hashsum does not match"); + System.err.println("Computed checksum:"); + byte[] c = computeHashsum(filename, HashAlgorithm.SHA512); + for (byte b: c) { + System.err.print(String.format("%02X", b)); } - if (fp.getOffset() == 0) { - System.err.println("Receiving first partialContent"); - // first partialContent - // increment offset - offset = fp.getPartialContent().length; - /* write first partialContent */ - try { - Files.write(new File(baseDirectory + filename).toPath(), fp.getPartialContent()); - } catch (IOException e) { - System.err.println("Error: cannot write file (" + baseDirectory + filename + ")"); - } - // next partialContentRequest - if (offset != fp.getTotalSize()) { - System.err.println("Sending following request with offset: " + offset + " maxpartialsize: " + MAX_PARTIAL_SIZE); - d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, offset, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); - } else { - fileFullyWritten = true; - } - } else if (offset == fp.getOffset()){ - System.err.println("Receiving following partialContent (offset: " + offset + ")"); - // following - // increment offset - offset += fp.getPartialContent().length; - /* write following partialContent at end of file*/ - try { - Files.write(Paths.get(baseDirectory + filename), fp.getPartialContent(), StandardOpenOption.APPEND); - } catch (IOException e) { - System.err.println("Error: cannot write file (" + baseDirectory + filename + ")"); - } - if (offset >= fp.getTotalSize()) { - fileFullyWritten = true; - } else { - // next partialContentRequest - d = new ProtocolP2PPacketUDP((Payload) new LoadRequest(filename, offset, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getUDPSocket()); - } - - } else { - System.err.println("offset: " + fp.getOffset() + " ; content.length: " + fp.getPartialContent().length + " ; totalSize: " + fp.getTotalSize()); - System.err.println("Error: cannot handle non-consecutive partial files (not implemented)"); - throw new InternalError(); + System.err.println(""); + System.err.println("Received checksum:"); + for (byte b: hash512) { + System.err.print(String.format("%02X", b)); } + System.err.println(""); + throw new InternalError(); } - } catch (EmptyDirectory e) { - throw new ProtocolError(); - } - } while(!fileFullyWritten); - if (!Arrays.equals(hash512, computeHashsum(filename, HashAlgorithm.SHA512))) { - System.err.println("Error: Hashsum does not match"); - System.err.println("Computed checksum:"); - byte[] c = computeHashsum(filename, HashAlgorithm.SHA512); - for (byte b: c) { - System.err.print(String.format("%02X", b)); - } - System.err.println(""); - System.err.println("Received checksum:"); - for (byte b: hash512) { - System.err.print(String.format("%02X", b)); + } else { + throw new InternalError(); } - System.err.println(""); + } catch (InterruptedException e) { throw new InternalError(); } } diff --git a/src/clientP2P/ClientP2P.java b/src/clientP2P/ClientP2P.java index f5ab3db..606f2cd 100644 --- a/src/clientP2P/ClientP2P.java +++ b/src/clientP2P/ClientP2P.java @@ -35,7 +35,7 @@ public class ClientP2P { case "udp": case "2" : System.out.println("Starting with UDP"); - ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.hostList); + ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.hostList, c.directories.getDataHomeDirectory() + c.parts + "/"); t = new Thread(cmudp); break; case "TCP": From 4c7fcab67d738ecf2af92e3c72fc1e7b35613415 Mon Sep 17 00:00:00 2001 From: Louis Date: Mon, 16 Mar 2020 15:56:47 +0100 Subject: [PATCH 2/5] Improvement multithreads --- src/clientP2P/ClientDownloadPartUDP.java | 11 +++++- src/clientP2P/ClientDownloadUDP.java | 48 ++++++++++++++---------- src/clientP2P/ClientP2P.java | 1 + src/serverP2P/ServerP2P.java | 11 ++++-- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/clientP2P/ClientDownloadPartUDP.java b/src/clientP2P/ClientDownloadPartUDP.java index 4ec0391..30c373c 100644 --- a/src/clientP2P/ClientDownloadPartUDP.java +++ b/src/clientP2P/ClientDownloadPartUDP.java @@ -41,13 +41,15 @@ public class ClientDownloadPartUDP implements Runnable { private volatile boolean noTask; private String partsSubdir; private static final long MAX_PARTIAL_SIZE = 4096; + private ClientDownloadUDP manager; /** Constructor with filename, socket, and part subdir * @param filename name of file to download * @param socket socket to use * @param partsSubdir directory to store .part files */ - public ClientDownloadPartUDP(String filename, DatagramSocket socket, String partsSubdir) { + public ClientDownloadPartUDP(ClientDownloadUDP manager, String filename, DatagramSocket socket, String partsSubdir) { + this.manager = manager; this.partsSubdir = partsSubdir; this.filename = filename; this.socket = socket; @@ -80,9 +82,15 @@ public class ClientDownloadPartUDP implements Runnable { while(!stop) { try { doTasks(); + synchronized(manager) { + manager.notify(); + } } catch(InterruptedException e) { try { setStop(); + synchronized(manager) { + manager.notify(); + } } catch (InterruptedException e2) { } } @@ -168,6 +176,7 @@ public class ClientDownloadPartUDP implements Runnable { } catch (IndexOutOfBoundsException e) { noTask = true; } + } } diff --git a/src/clientP2P/ClientDownloadUDP.java b/src/clientP2P/ClientDownloadUDP.java index 134ab2f..2bd2f29 100644 --- a/src/clientP2P/ClientDownloadUDP.java +++ b/src/clientP2P/ClientDownloadUDP.java @@ -84,8 +84,9 @@ public class ClientDownloadUDP implements Runnable { purgeList(); initThreads(); while(!stop) { - checkTasksStatus(); assignTasks(); + checkTasksStatus(); + } } System.err.println("Reassembling file parts."); @@ -101,7 +102,7 @@ public class ClientDownloadUDP implements Runnable { */ private void initThreads() { for(HostItem hostItem: hostList) { - sockList.add(new ClientDownloadPartUDP(filename, hostItem.getUDPSocket(), partsSubdir)); + sockList.add(new ClientDownloadPartUDP(this, filename, hostItem.getUDPSocket(), partsSubdir)); } for(ClientDownloadPartUDP c: sockList) { Thread t = new Thread(c); @@ -114,25 +115,32 @@ public class ClientDownloadUDP implements Runnable { * @throws InternalError */ private void checkTasksStatus() throws InternalError { - List sockListCpy = new ArrayList<>(sockList); - for(ClientDownloadPartUDP c: sockListCpy) { - if (c.hasFailed() == true) { - sockList.remove(c); - offsetsPending.removeAll(c.getFailed()); - offsetsToAsk.addAll(c.getFailed()); - } - try { - offsetsPending.removeAll(c.getDone()); - } catch (InterruptedException e) { - throw new InternalError(); + try { + synchronized(this) { + this.wait(); + List sockListCpy = new ArrayList<>(sockList); + for(ClientDownloadPartUDP c: sockListCpy) { + if (c.hasFailed() == true) { + sockList.remove(c); + offsetsPending.removeAll(c.getFailed()); + offsetsToAsk.addAll(c.getFailed()); + } + try { + offsetsPending.removeAll(c.getDone()); + } catch (InterruptedException e) { + throw new InternalError(); + } + } + System.err.println("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending"); + if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { + stop = true; + } + if (sockList.size() == 0) { + System.err.println("No thread working"); + throw new InternalError(); + } } - } - System.err.println("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending"); - if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { - stop = true; - } - if (sockList.size() == 0) { - System.err.println("No thread working"); + } catch (InterruptedException e) { throw new InternalError(); } } diff --git a/src/clientP2P/ClientP2P.java b/src/clientP2P/ClientP2P.java index 606f2cd..a1aa92a 100644 --- a/src/clientP2P/ClientP2P.java +++ b/src/clientP2P/ClientP2P.java @@ -33,6 +33,7 @@ public class ClientP2P { switch(transportchoosen){ case "UDP": case "udp": + case "upd": // alias typo case "2" : System.out.println("Starting with UDP"); ClientManagementUDP cmudp = new ClientManagementUDP(c.directories.getDataHomeDirectory(), c.hostList, c.directories.getDataHomeDirectory() + c.parts + "/"); diff --git a/src/serverP2P/ServerP2P.java b/src/serverP2P/ServerP2P.java index 9b068c1..98b7b23 100644 --- a/src/serverP2P/ServerP2P.java +++ b/src/serverP2P/ServerP2P.java @@ -11,16 +11,19 @@ public class ServerP2P { private Logger logger; - public ServerP2P() { - directories = new Directories("P2P_JAVA_PROJECT_SERVER"); + public ServerP2P(String portStr) { + port = Integer.valueOf(Integer.parseInt(portStr)); + directories = new Directories("P2P_JAVA_PROJECT_SERVER_" + port); directories.createSubdir(subdir); logger = new Logger(directories.getDataHomeDirectory() + "server.log"); - port = 40001; System.out.println("Server will listen on port " + port + " and serve files from " + directories.getDataHomeDirectory() + subdir); directories.askOpenDataHomeDirectory(subdir); } public static void main(String [] args) { - ServerP2P s = new ServerP2P(); + /* first arg must be port number + * run with: java -ea serverP2P.ServerP2P -- + * */ + ServerP2P s = new ServerP2P(args[1]); ServerManagementUDP smudp = new ServerManagementUDP(s.directories.getDataHomeDirectory() + subdir, s.port, s.logger); ServerManagementTCP smtcp = new ServerManagementTCP(s.directories.getDataHomeDirectory() + subdir, s.port, s.logger); Thread tudp = new Thread(smudp); From 87013ceea86cbb144ae5c2de9c474744a5e85668 Mon Sep 17 00:00:00 2001 From: Flavien Haas Date: Wed, 18 Mar 2020 16:26:21 +0100 Subject: [PATCH 3/5] tcp multiple (#32) --- src/clientP2P/ClientDownloadPartTCP.java | 317 +++++++++++++++++++ src/clientP2P/ClientDownloadTCP.java | 386 +++++++++++++++++++++++ src/clientP2P/ClientManagementTCP.java | 154 +++------ src/clientP2P/ClientP2P.java | 2 +- 4 files changed, 742 insertions(+), 117 deletions(-) create mode 100644 src/clientP2P/ClientDownloadPartTCP.java create mode 100644 src/clientP2P/ClientDownloadTCP.java diff --git a/src/clientP2P/ClientDownloadPartTCP.java b/src/clientP2P/ClientDownloadPartTCP.java new file mode 100644 index 0000000..11a277c --- /dev/null +++ b/src/clientP2P/ClientDownloadPartTCP.java @@ -0,0 +1,317 @@ +package clientP2P; +import java.util.List; +import java.util.ArrayList; +import java.net.Socket; +import protocolP2P.ProtocolP2PPacketTCP; +import protocolP2P.Payload; +import protocolP2P.LoadRequest; +import protocolP2P.FilePart; +import exception.InternalError; +import remoteException.EmptyDirectory; +import remoteException.EmptyFile; +import exception.ProtocolError; +import remoteException.InternalRemoteError; +import remoteException.VersionRemoteError; +import exception.TransmissionError; +import remoteException.ProtocolRemoteError; +import exception.VersionError; +import exception.SizeError; +import remoteException.NotFound; +import java.nio.file.Files; +import java.io.File; +import java.nio.file.Paths; +import java.io.IOException; +import exception.SocketClosed; + +/** Class to download file parts on tcp. +* @author Louis Royer +* @author Flavien Haas +* @author JS Auge +* @version 1.0 +*/ +public class ClientDownloadPartTCP implements Runnable { + + private List toDoTasks; + private List pendingTasks; + private List tasksDone; + private volatile boolean tasksListsLock; + private volatile boolean stop; + private volatile boolean failed; + private String filename; + private Socket socket; + private volatile boolean noTask; + private String partsSubdir; + private static final long MAX_PARTIAL_SIZE = 4096; + private ClientDownloadTCP manager; + + /** Constructor with filename, socket, and part subdir + * @param filename name of file to download + * @param socket socket to use + * @param partsSubdir directory to store .part files + */ + public ClientDownloadPartTCP(ClientDownloadTCP manager, String filename, Socket socket, String partsSubdir) { + this.manager = manager; + this.partsSubdir = partsSubdir; + this.filename = filename; + this.socket = socket; + stop = false; + failed = false; + pendingTasks = new ArrayList<>(); + toDoTasks = new ArrayList<>(); + tasksDone = new ArrayList<>(); + noTask = true; + tasksListsLock = false; + } + + /** True if thread has failed to get a file. + * @return true if thread has failed to get a file + */ + public boolean hasFailed() { + return failed; + } + + /** Asks to stop thread. + * @throws InterruptedException + */ + public synchronized void setStop() throws InterruptedException { + stop = true; + this.notifyAll(); + } + + /** Runnable implementation */ + public void run() { + while(!stop) { + try { + doTasks(); + synchronized(manager) { + manager.notify(); + } + } catch(InterruptedException e) { + try { + setStop(); + synchronized(manager) { + manager.notify(); + } + } catch (InterruptedException e2) { + } + } + } + System.err.println("Closing socket"); + try{ + socket.close(); + } catch(IOException e){ + System.err.println("can't close socket"); + } + } + + /** Get list of offsets that have not be downloaded if failed, else + * empty list. + * @return list of offsets + */ + public List getFailed() { + List ret = new ArrayList<>(); + if (failed) { + ret.addAll(pendingTasks); + ret.addAll(toDoTasks); + } + return ret; + } + + /** Get list of downloaded file parts offset, then clear this list. + * @return list of offsets + * @throws InterruptedException + */ + public List getDone() throws InterruptedException { + if (tasksDone.size() == 0) { + return new ArrayList<>(); + } else { + synchronized (this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + List ret = new ArrayList<>(tasksDone); + tasksDone.clear(); + tasksListsLock = false; + this.notifyAll(); + return ret; + } + } + } + + /** Adds offset of files parts to download. + * @param task offset to download + * @throws InterruptedException + */ + public synchronized void assignTask(Long task) throws InterruptedException { + synchronized(this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + toDoTasks.add(task); + noTask = false; + tasksListsLock = false; + this.notifyAll(); + } + } + + /** Send one request and wait for one response. Blocks when no task. + * @throws InterruptedException + */ + public synchronized void doTasks() throws InterruptedException { + while(noTask && !stop) { + this.wait(); + } + if (!stop) { + try { + Long offset = toDoTasks.get(0); + ProtocolP2PPacketTCP p = reqPart(offset); + if (p == null) { + stop = true; + } + + failed = downloadPart(p); + if (failed) { + System.err.println("Error: DownloadPart failed."); + stop = true; + } else if (toDoTasks.isEmpty()) { + noTask = true; + } + } catch (IndexOutOfBoundsException e) { + noTask = true; + } + + } + } + + public ProtocolP2PPacketTCP reqPart(Long offset) { + System.err.println("New request: "+ offset); + // maintain tracking of tasks + if (toDoTasks.contains(offset)) { + try { + synchronized (this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + toDoTasks.remove(offset); + pendingTasks.add(offset); + tasksListsLock = false; + this.notifyAll(); + } + } catch(InterruptedException e) { + System.err.println("Error: reqPart interruptedException"); + return null; + } + } else { + System.err.println("Error: reqPart (offset " + offset + " not in toDoTasks)"); + return null; + } + // send request + try { + ProtocolP2PPacketTCP d = new ProtocolP2PPacketTCP((Payload) new LoadRequest(filename, offset.longValue(), MAX_PARTIAL_SIZE)); + d.sendRequest((Object)socket); + return d; + } catch (InternalError e) { + System.err.println("Error: reqPart internalError"); + return null; + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Error: reqPart ioexception"); + return null; + } catch (SocketClosed e){ + System.err.println("Error: reqPart SocketClosed"); + return null; + } + } + + public boolean downloadPart(ProtocolP2PPacketTCP d) { + if (d == null) { + System.err.println("Error: downloadPart -> d is null."); + return true; + } + try { + Payload p = d.receiveResponse().getPayload(); + assert p instanceof FilePart : "This payload must be instance of FilePart"; + if (!(p instanceof FilePart)) { + System.err.println("Error: cannot get size."); + return true; + } else { + FilePart fp = (FilePart)p; + if (!fp.getFilename().equals(filename)) { + System.err.println("Error: wrong file received: `" + fp.getFilename() + "`"); + return true; + } + Long offset = Long.valueOf(fp.getOffset()); + if (pendingTasks.contains(offset)) { + try { + Files.write(new File(partsSubdir + filename + "_" + offset + ".part").toPath(), fp.getPartialContent()); + } catch (IOException e) { + System.err.println("Error: cannot write file (" + partsSubdir + filename + "_" + offset + ".part)"); + } + } else { + System.err.println("Error: wrong file part received."); + return true; + } + try { + synchronized(this) { + while(tasksListsLock) { + this.wait(); + } + tasksListsLock = true; + pendingTasks.remove(offset); + tasksDone.add(offset); + tasksListsLock = false; + this.notifyAll(); + } + } catch(InterruptedException e) { + System.err.println("Error: DownloadPart Interrupted exception"); + return true; + } + } + } catch (EmptyDirectory e) { + System.err.println("Error: empty directory."); + return true; + } catch (EmptyFile e) { + System.err.println("Error: downloadPart emptyFile"); + // TODO: use more specific errors + return true; + } catch (ProtocolError e) { + System.err.println("Error: downloadPart protocolError"); + return true; + } catch (InternalRemoteError e) { + System.err.println("Error: downloadPart internalRemoteError"); + return true; + } catch (VersionRemoteError e) { + System.err.println("Error: downloadPart versionRemoteError"); + return true; + } catch (ProtocolRemoteError e) { + System.err.println("Error: downloadPart protocolRemoteError"); + return true; + } catch (TransmissionError e) { + System.err.println("Error: downloadPart transmissionError"); + return true; + } catch (VersionError e) { + System.err.println("Error: downloadPart versionError"); + return true; + } catch (SizeError e) { + System.err.println("Error: downloadPart sizeError"); + return true; + } catch (NotFound e) { + System.err.println("Error: downloadPart notFound"); + return true; + } catch (IOException e) { + System.err.println("Error: downloadPart ioexception"); + return true; + } catch (InternalError e) { + System.err.println("Error: downloadPart internalError"); + return true; + } catch (SocketClosed e){ + System.err.println("Error: downloadPart SocketClosed"); + return true; + } + return false; + } + +} diff --git a/src/clientP2P/ClientDownloadTCP.java b/src/clientP2P/ClientDownloadTCP.java new file mode 100644 index 0000000..442d105 --- /dev/null +++ b/src/clientP2P/ClientDownloadTCP.java @@ -0,0 +1,386 @@ +package clientP2P; +import clientP2P.ClientDownloadPartTCP; +import tools.HostItem; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import remoteException.EmptyDirectory; +import remoteException.EmptyFile; +import remoteException.VersionRemoteError; +import remoteException.ProtocolRemoteError; +import remoteException.NotFound; +import remoteException.InternalRemoteError; +import protocolP2P.HashAlgorithm; +import protocolP2P.HashResponse; +import protocolP2P.HashRequest; +import protocolP2P.ProtocolP2PPacketTCP; +import protocolP2P.Payload; +import exception.ProtocolError; +import exception.InternalError; +import exception.TransmissionError; +import exception.SizeError; +import exception.VersionError; +import protocolP2P.FilePart; +import protocolP2P.LoadRequest; +import java.io.IOException; +import java.nio.file.Files; +import java.io.File; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; +import exception.SocketClosed; + +/** Class to download file from tcp +* @author Louis Royer +* @author Flavien Haas +* @author JS Auge +* @version 1.0 +*/ +public class ClientDownloadTCP implements Runnable { + + private List hostList; + private String filename; + private byte[] hash512; + private List sockList = new ArrayList(); + private List offsetsToAsk = new ArrayList(); + private List offsetsPending = new ArrayList(); + private boolean stop; + private long size; + private static final long MAX_PARTIAL_SIZE = 4096; + private String partsSubdir; + private String dirStorage; + private boolean success = false; + + /** Constructor with parameters: filename, list of hosts, parts subdirectory and dirStorage + * @param filename name of file to download + * @param hostList list of servers + * @param partsSubdir directory to store .part files + * @param dirStorage directory to write assembled file + */ + public ClientDownloadTCP(String filename, List hostList, String partsSubdir, String dirStorage) { + this.partsSubdir = partsSubdir; + this.dirStorage = dirStorage; + this.filename = filename; + this.hostList = hostList; + this.stop = false; + } + + /** Asks thread to stop + */ + public void setStop() { + stop = true; + } + + /** Runnable implementation + */ + public void run() { + try { + init(); + if (stop) { + System.err.println("File is smaller than part max size."); + hostList.get(0).closeTCPSocket(); + } else { + System.err.println("File is bigger than part max size."); + purgeList(); + initThreads(); + while(!stop) { + assignTasks(); + checkTasksStatus(); + + } + } + System.err.println("Reassembling file parts."); + reassembleFile(); + } catch(InternalError e) { + System.err.println("Error while downloading file. Aborting."); + } finally { + stopTasks(); + } + } + + /** Starts threads for each server in hostList. + */ + private void initThreads() { + for(HostItem hostItem: hostList) { + sockList.add(new ClientDownloadPartTCP(this, filename, hostItem.getTCPSocket(), partsSubdir)); + } + for(ClientDownloadPartTCP c: sockList) { + Thread t = new Thread(c); + t.start(); + } + System.err.println("Threads initialized"); + } + + /** Remove tasks from failed threads. Update done status. + * @throws InternalError + */ + private void checkTasksStatus() throws InternalError { + try { + synchronized(this) { + this.wait(); + List sockListCpy = new ArrayList<>(sockList); + for(ClientDownloadPartTCP c: sockListCpy) { + if (c.hasFailed() == true) { + sockList.remove(c); + offsetsPending.removeAll(c.getFailed()); + offsetsToAsk.addAll(c.getFailed()); + } + try { + offsetsPending.removeAll(c.getDone()); + } catch (InterruptedException e) { + throw new InternalError(); + } + } + System.err.println("Task check status: " + offsetsToAsk.size() + " to asks, " + offsetsPending.size() + " pending"); + if (offsetsToAsk.isEmpty() && offsetsPending.isEmpty()) { + stop = true; + } + if (sockList.size() == 0) { + System.err.println("No thread working"); + throw new InternalError(); + } + } + } catch (InterruptedException e) { + throw new InternalError(); + } + } + + /** Assign tasks randomly to threads. + * @throws InternalError + */ + private void assignTasks() throws InternalError { + Random rand = new Random(); + for(long offset : offsetsToAsk) { + try { + sockList.get(rand.nextInt(sockList.size())).assignTask(offset); + offsetsPending.add(offset); + System.err.println("Assigned task "+ offset); + } catch(InterruptedException e) { + throw new InternalError(); + } + } + offsetsToAsk.removeAll(offsetsPending); + } + + /** Stop threads */ + private void stopTasks() { + for(ClientDownloadPartTCP c : sockList) { + try { + c.setStop(); + } catch (InterruptedException e) {} + } + } + + /** Get hashsum from server. + * @param hostItem server to ask hash + * @return hash512sum + * @throws InternalError + */ + private byte[] getHashSum512(HostItem hostItem) throws InternalError { + byte[] hash; + HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; + hashesAlgo[0] = HashAlgorithm.SHA512; + ProtocolP2PPacketTCP d = new ProtocolP2PPacketTCP((Payload) new HashRequest(filename, hashesAlgo)); + try { + d.sendRequest((Object)hostItem.getTCPSocket()); + try { + Payload pHash = d.receiveResponse().getPayload(); + assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; + if (!(pHash instanceof HashResponse)) { + throw new InternalError(); + } else { + hash = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); + } + } catch (EmptyDirectory e) { + hash = new byte[0]; + } catch (NotFound e) { + hash = new byte[0]; + // TODO: use more specific errors + } catch (EmptyFile e) { + throw new InternalError(); + } catch (ProtocolError e) { + throw new InternalError(); + } catch (InternalRemoteError e) { + throw new InternalError(); + } catch (VersionRemoteError e) { + throw new InternalError(); + } catch (ProtocolRemoteError e) { + throw new InternalError(); + } catch (TransmissionError e) { + throw new InternalError(); + } catch (VersionError e) { + throw new InternalError(); + } catch (SizeError e) { + throw new InternalError(); + } + return hash; + } catch (IOException e) { + throw new InternalError(); + } catch (SocketClosed e){ + System.err.println("getHashSum512 : SocketClosed"); + throw new InternalError(); + } + } + + /** Removes servers not owning the correct file to download from list. + * This is done by comparing hash512sum. + * @throws InternalError + */ + private void purgeList() throws InternalError { + List blackList = new ArrayList(); + boolean first = false; + byte[] hashsum; + for(HostItem host: hostList) { + // already have hashsum from 1st server + if (!first) { + first = true; + continue; + } + // ask hashsum + hashsum = getHashSum512(host); + if (!Arrays.equals(hash512, hashsum)) { + blackList.add(host); + } + } + // purge list + for(HostItem host: blackList) { + hostList.remove(host); + } + System.err.println("Host list purge: done"); + } + + /** Getter for hash512sum + * @return hash512sum + */ + public byte[] getHashSum512() { + return hash512; + } + + /** Initialize infos about file to download (size, hash512sum, partslist to dl). + * Also download first partfile (to get size). + * @throws InternalError + */ + private void init() throws InternalError { + // get size + setSize(); + + // get hashsum from 1st server in list + hash512 = getHashSum512(hostList.get(0)); + if (hash512.length == 0) { + System.err.println("Error: no hash512sum support."); + throw new InternalError(); + } + + // Add tasks + if (!stop) { + for(long i=MAX_PARTIAL_SIZE; i= size) { + success = true; + System.err.println("Reassembling: success"); + } else { + // append to file + try { + Files.write(new File(dirStorage + filename).toPath(), Files.readAllBytes(new File(partsSubdir + filename + "_" + nextOffset + ".part").toPath()), StandardOpenOption.APPEND); + nextOffset = (new File(dirStorage + filename)).length(); + } catch (IOException e) { + abort = true; + System.err.println("Aborting: bad number " + nextOffset); + } + } + } while((!success) && (!abort)); + } +} diff --git a/src/clientP2P/ClientManagementTCP.java b/src/clientP2P/ClientManagementTCP.java index 728cd8e..66878c7 100644 --- a/src/clientP2P/ClientManagementTCP.java +++ b/src/clientP2P/ClientManagementTCP.java @@ -4,7 +4,6 @@ import exception.ProtocolError; import exception.SizeError; import exception.TransmissionError; import exception.VersionError; -import exception.SocketClosed; import remoteException.EmptyFile; import remoteException.EmptyDirectory; import remoteException.InternalRemoteError; @@ -13,15 +12,16 @@ import remoteException.ProtocolRemoteError; import remoteException.VersionRemoteError; import java.net.UnknownHostException; import java.util.Scanner; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.Socket; +//import java.net.InetAddress; +//import java.net.SocketException; import java.io.IOException; import java.nio.file.Files; import java.io.File; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.List; +import tools.HostItem; import protocolP2P.ProtocolP2PPacketTCP; import protocolP2P.Payload; import protocolP2P.RequestResponseCode; @@ -33,8 +33,8 @@ import protocolP2P.HashRequest; import protocolP2P.HashResponse; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import tools.HostItem; -import java.util.List; +import clientP2P.ClientDownloadTCP; +import exception.SocketClosed; /** Implementation of P2P-JAVA-PROJECT CLIENT * @author Louis Royer @@ -44,30 +44,32 @@ import java.util.List; */ public class ClientManagementTCP implements Runnable { private String baseDirectory; + private String partsSubdir; private List hostList; - /** Constructor for TCP implementation, with baseDirectory and TCPPort parameters. * @param baseDirectory the root directory where files are stored * @param host hostname of the server * @param TCPPort the server will listen on this port */ - public ClientManagementTCP(String baseDirectory, List hostList) { + public ClientManagementTCP(String baseDirectory, List hostList, String partsSubdir) { this.baseDirectory = baseDirectory; this.hostList = hostList; + this.partsSubdir = partsSubdir; } /** Implementation of Runnable */ public void run() { try { + System.out.println("Enter all servers: type \"stop\" when finished"); + Scanner scanner = new Scanner(System.in); String[] list = listDirectory(); System.out.println("Files present on the server:"); for(String listItem: list) { System.out.println(listItem); } System.out.println("Name of the file to download:"); - Scanner scanner = new Scanner(System.in); String f = scanner.nextLine(); download(f); System.out.println("File sucessfully downloaded"); @@ -77,8 +79,6 @@ public class ClientManagementTCP implements Runnable { System.err.println("Error: Client internal error"); } catch (UnknownHostException e) { System.err.println("Error: Server host is unknown"); - } catch (SocketClosed e) { - System.err.println("Error: Request cannot be send or response cannot be received"); } catch (IOException e) { System.err.println("Error: Request cannot be send or response cannot be received"); } catch (TransmissionError e) { @@ -99,10 +99,6 @@ public class ClientManagementTCP implements Runnable { System.err.println("Error: Server has not this file in directory"); } catch (EmptyFile e) { System.err.println("Error: File is empty"); - } finally { - for (HostItem h: hostList) { - h.closeTCPSocket(); - } } } @@ -112,7 +108,6 @@ public class ClientManagementTCP implements Runnable { * @throws InternalError * @throws UnknownHostException * @throws IOException - * @throws SocketClosed * @throws TransmissionError * @throws ProtocolError * @throws VersionError @@ -122,117 +117,41 @@ public class ClientManagementTCP implements Runnable { * @throws VersionRemoteError * @throws EmptyFile */ - private void download(String filename) throws EmptyFile, NotFound, InternalError, UnknownHostException, IOException, SocketClosed, TransmissionError, ProtocolError, VersionError, SizeError, InternalRemoteError, ProtocolRemoteError, VersionRemoteError { - byte [] hash512; - final long MAX_PARTIAL_SIZE = 4096; - HashAlgorithm[] hashesAlgo = new HashAlgorithm[1]; - hashesAlgo[0] = HashAlgorithm.SHA512; - ProtocolP2PPacketTCP d = new ProtocolP2PPacketTCP((Payload) new HashRequest(filename, hashesAlgo)); - d.sendRequest((Object)hostList.get(0).getTCPSocket()); + private void download(String filename) throws EmptyFile, NotFound, InternalError, UnknownHostException, IOException, TransmissionError, ProtocolError, VersionError, SizeError, InternalRemoteError, ProtocolRemoteError, VersionRemoteError { + ClientDownloadTCP downLoader = new ClientDownloadTCP(filename, hostList, partsSubdir, baseDirectory); + Thread t = new Thread(downLoader); + t.start(); try { - Payload pHash = d.receiveResponse().getPayload(); - assert pHash instanceof HashResponse : "This payload must be instance of HashResponse"; - if (!(pHash instanceof HashResponse)) { - throw new InternalError(); - } else { - hash512 = ((HashResponse)pHash).getHash(HashAlgorithm.SHA512); - if (hash512.length == 0) { - System.err.println("Error: server do not support sha512 hashes"); - throw new InternalError(); - } - } - } catch (EmptyDirectory e) { - System.err.println("Error: empty directory, but request was not LIST_REQUEST"); - throw new ProtocolError(); - } - - d = new ProtocolP2PPacketTCP((Payload) new LoadRequest(filename, 0, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getTCPSocket()); - boolean fileFullyWritten = false; - long offset = 0; - do { - try { - Payload p = d.receiveResponse().getPayload(); - assert p instanceof FilePart : "This payload must be instance of FilePart"; - if (!(p instanceof FilePart)) { - throw new InternalError(); - } else { - FilePart fp = (FilePart)p; - if (!fp.getFilename().equals(filename)) { - System.err.println("Error: wrong file received: `" + fp.getFilename() + "`"); - throw new ProtocolError(); + t.join(); + if (downLoader.getSuccess()) { + byte[] hash512 = downLoader.getHashSum512(); + if (!Arrays.equals(hash512, computeHashsum(filename, HashAlgorithm.SHA512))) { + System.err.println("Error: Hashsum does not match"); + System.err.println("Computed checksum:"); + byte[] c = computeHashsum(filename, HashAlgorithm.SHA512); + for (byte b: c) { + System.err.print(String.format("%02X", b)); } - if (fp.getOffset() == 0) { - System.err.println("Receiving first partialContent"); - // first partialContent - // increment offset - offset = fp.getPartialContent().length; - /* write first partialContent */ - try { - Files.write(new File(baseDirectory + filename).toPath(), fp.getPartialContent()); - } catch (IOException e) { - System.err.println("Error: cannot write file (" + baseDirectory + filename + ")"); - } - // next partialContentRequest - if (offset != fp.getTotalSize()) { - System.err.println("Sending following request with offset: " + offset + " maxpartialsize: " + MAX_PARTIAL_SIZE); - d = new ProtocolP2PPacketTCP((Payload) new LoadRequest(filename, offset, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getTCPSocket()); - } else { - fileFullyWritten = true; - } - } else if (offset == fp.getOffset()){ - System.err.println("Receiving following partialContent (offset: " + offset + ")"); - // following - // increment offset - offset += fp.getPartialContent().length; - /* write following partialContent at end of file*/ - try { - Files.write(Paths.get(baseDirectory + filename), fp.getPartialContent(), StandardOpenOption.APPEND); - } catch (IOException e) { - System.err.println("Error: cannot write file (" + baseDirectory + filename + ")"); - } - if (offset >= fp.getTotalSize()) { - fileFullyWritten = true; - } else { - // next partialContentRequest - d = new ProtocolP2PPacketTCP((Payload) new LoadRequest(filename, offset, MAX_PARTIAL_SIZE)); - d.sendRequest((Object)hostList.get(0).getTCPSocket()); - } - - } else { - System.err.println("offset: " + fp.getOffset() + " ; content.length: " + fp.getPartialContent().length + " ; totalSize: " + fp.getTotalSize()); - System.err.println("Error: cannot handle non-consecutive partial files (not implemented)"); - throw new InternalError(); + System.err.println(""); + System.err.println("Received checksum:"); + for (byte b: hash512) { + System.err.print(String.format("%02X", b)); } + System.err.println(""); + throw new InternalError(); } - } catch (EmptyDirectory e) { - throw new ProtocolError(); - } - } while(!fileFullyWritten); - if (!Arrays.equals(hash512, computeHashsum(filename, HashAlgorithm.SHA512))) { - System.err.println("Error: Hashsum does not match"); - System.err.println("Computed checksum:"); - byte[] c = computeHashsum(filename, HashAlgorithm.SHA512); - for (byte b: c) { - System.err.print(String.format("%02X", b)); - } - System.err.println(""); - System.err.println("Received checksum:"); - for (byte b: hash512) { - System.err.print(String.format("%02X", b)); + } else { + throw new InternalError(); } - System.err.println(""); + } catch (InterruptedException e) { throw new InternalError(); } - hostList.get(0).closeTCPSocket(); } /** list server’s directory content * @return list of files * @throws InternalError * @throws UnknowHostException - * @throws SocketClosed * @throws IOException * @throws TransmissionError * @throws ProtocolError @@ -243,10 +162,10 @@ public class ClientManagementTCP implements Runnable { * @throws ProtocolRemoteError * @throws VersionRemoteError */ - private String[] listDirectory() throws EmptyDirectory, InternalError, UnknownHostException, SocketClosed, IOException, TransmissionError, ProtocolError, VersionError, SizeError, InternalRemoteError, ProtocolRemoteError, VersionRemoteError { + private String[] listDirectory() throws EmptyDirectory, InternalError, UnknownHostException, IOException, TransmissionError, ProtocolError, VersionError, SizeError, InternalRemoteError, ProtocolRemoteError, VersionRemoteError { ProtocolP2PPacketTCP d = new ProtocolP2PPacketTCP(new Payload(RequestResponseCode.LIST_REQUEST)); - d.sendRequest((Object)hostList.get(0).getTCPSocket()); try { + d.sendRequest((Object)hostList.get(0).getTCPSocket()); Payload p = d.receiveResponse().getPayload(); assert p instanceof FileList : "This payload must be instance of Filelist"; if (!(p instanceof FileList)) { @@ -258,6 +177,9 @@ public class ClientManagementTCP implements Runnable { throw new ProtocolError(); } catch (EmptyFile e) { throw new ProtocolError(); + } catch (SocketClosed e){ + System.err.println("listDirectory : SocketClosed"); + throw new ProtocolError(); } } diff --git a/src/clientP2P/ClientP2P.java b/src/clientP2P/ClientP2P.java index a1aa92a..24050d3 100644 --- a/src/clientP2P/ClientP2P.java +++ b/src/clientP2P/ClientP2P.java @@ -44,7 +44,7 @@ public class ClientP2P { case "1": default: System.out.println("Starting with TCP"); - ClientManagementTCP cmtcp = new ClientManagementTCP(c.directories.getDataHomeDirectory(), c.hostList); + ClientManagementTCP cmtcp = new ClientManagementTCP(c.directories.getDataHomeDirectory(), c.hostList, c.directories.getDataHomeDirectory() + c.parts + "/"); t = new Thread(cmtcp); break; } From cb86478764a2e5c5a5d97a6a4cf1e40e7523661b Mon Sep 17 00:00:00 2001 From: flavien Date: Wed, 18 Mar 2020 17:46:37 +0100 Subject: [PATCH 4/5] Merging client and server --- src/clientP2P/ClientP2P.java | 57 +++++++++++++++++++++++++++++++----- src/tools/Logger.java | 4 +-- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/src/clientP2P/ClientP2P.java b/src/clientP2P/ClientP2P.java index 24050d3..c6f8b63 100644 --- a/src/clientP2P/ClientP2P.java +++ b/src/clientP2P/ClientP2P.java @@ -1,32 +1,73 @@ package clientP2P; import clientP2P.ClientManagementUDP; import clientP2P.ClientManagementTCP; +import serverP2P.ServerManagementUDP; +import serverP2P.ServerManagementTCP; +import tools.Directories; +import tools.Logger; +import tools.LogLevel; import tools.Directories; import java.util.Scanner; import java.util.List; import tools.HostItem; import tools.HostList; +import java.lang.NumberFormatException; public class ClientP2P { + static private final String subdir = "seeded/"; + static private String parts = ".parts"; + private Logger logger; private String host; private int port; private Directories directories; private List hostList; - private String parts = ".parts"; - public ClientP2P() { - directories = new Directories("P2P_JAVA_PROJECT_CLIENT"); + private static final int defaultPort = 20000; + + public void initLogger() { + if (directories == null && logger == null) { + directories = new Directories("P2P_JAVA_PROJECT" + port); + logger = new Logger(directories.getDataHomeDirectory() + "server.log"); + } + } + + public ClientP2P(String portStr) { + try{ + port = Integer.valueOf(Integer.parseInt(portStr)); + } catch (NumberFormatException e){ + int oldPort = port; + port = defaultPort; + initLogger(); + System.err.println("Error incorrect port " + oldPort + " using default port " + defaultPort); + logger.write("incorrect port " + oldPort + " using default port " + defaultPort, LogLevel.Info); + } + initLogger(); + directories.createSubdir(subdir); directories.createSubdir(parts); host = "localhost"; - port = 40001; - System.out.println("Client will try to contact server at " + host + " on port " + port + ". It will save files in " + directories.getDataHomeDirectory()); - directories.askOpenDataHomeDirectory(null); + System.out.println("Server will listen on port " + port + " and serve files from " + directories.getDataHomeDirectory() + subdir); + directories.askOpenDataHomeDirectory(subdir); System.out.println("Please enter list of servers to use; first one will be used to ask list of files"); hostList = HostList.getServList(); } public static void main(String [] args) { - ClientP2P c = new ClientP2P(); - System.out.println("Which transport protocol do you want to use? [TCP/udp]"); + ClientP2P c; + try { + c = new ClientP2P(args[1]); + } catch (IndexOutOfBoundsException e){ + c = new ClientP2P("" + defaultPort); + } + + ServerManagementUDP smudp = new ServerManagementUDP(c.directories.getDataHomeDirectory() + subdir, c.port, c.logger); + ServerManagementTCP smtcp = new ServerManagementTCP(c.directories.getDataHomeDirectory() + subdir, c.port, c.logger); + Thread tudp = new Thread(smudp); + tudp.setName("server UDP P2P-JAVA-PROJECT"); + tudp.start(); + Thread ttcp = new Thread(smtcp); + ttcp.setName("server TCP P2P-JAVA-PROJECT"); + ttcp.start(); + + System.out.println("Client : Which transport protocol do you want to use? [TCP/udp]"); Scanner sc = new Scanner(System.in); String transportchoosen = sc.nextLine(); Thread t; diff --git a/src/tools/Logger.java b/src/tools/Logger.java index 047c4b8..63c2107 100644 --- a/src/tools/Logger.java +++ b/src/tools/Logger.java @@ -30,7 +30,7 @@ public class Logger { /** Appends log to filelog and print to stderr. * @param text Text to log */ - private void write(String text, LogLevel logLevel) { + public void write(String text, LogLevel logLevel) { String msg = "[" + new Timestamp(System.currentTimeMillis()) + "] " + text + "\n"; String level = null; switch (logLevel) { @@ -109,7 +109,7 @@ public class Logger { public void writeUDP(String text, LogLevel logLevel) { write("[UDP] " + text, logLevel); } - + /** Appends log to filelog and print to stderr. * Adds [UDP] in log line. * @param text Text to log From 9369176432da1ace725062e3bb14ac6c3c5b1386 Mon Sep 17 00:00:00 2001 From: Louis Date: Wed, 18 Mar 2020 18:31:15 +0100 Subject: [PATCH 5/5] Fix server start Fix printing --- src/clientP2P/ClientP2P.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/clientP2P/ClientP2P.java b/src/clientP2P/ClientP2P.java index c6f8b63..654829e 100644 --- a/src/clientP2P/ClientP2P.java +++ b/src/clientP2P/ClientP2P.java @@ -47,7 +47,6 @@ public class ClientP2P { System.out.println("Server will listen on port " + port + " and serve files from " + directories.getDataHomeDirectory() + subdir); directories.askOpenDataHomeDirectory(subdir); System.out.println("Please enter list of servers to use; first one will be used to ask list of files"); - hostList = HostList.getServList(); } public static void main(String [] args) { @@ -67,6 +66,13 @@ public class ClientP2P { ttcp.setName("server TCP P2P-JAVA-PROJECT"); ttcp.start(); + try { + Thread.sleep(100); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + } + + c.hostList = HostList.getServList(); System.out.println("Client : Which transport protocol do you want to use? [TCP/udp]"); Scanner sc = new Scanner(System.in); String transportchoosen = sc.nextLine();