/* SYNQ by QBOLEC done in 2004 transfer = log(|NEW|) * |diff(OLD,NEW)| pcu-usage = log(|NEW|) * |OLD| */ using Nemerle; using Nemerle.Collections; using Nemerle.IO; using Nemerle.Utility; using System; using System.Console; using System.IO; using System.Net; using System.Net.Sockets; using System.ValueType; using System.Collections.Specialized; using System.Security.Cryptography; type Frag=int*int; namespace Synq{ module SynqTraffic{ public mutable bytesSent :int; public mutable bytesReceived :int; this(){ bytesSent=0; bytesReceived=0; } } module SynqDetails{ public port : int; public weakHashBytes : int; public strongHashBytes : int; public verifyHashBytes : int; public explicitBytes : int; public weakHashMask : int; public ExplodeBits(data:int):list[bool]{ def loop(pos){ | 32 => []; | _ => ((data %& (1< () | ([],_,_) => mouth.Write(acc);SynqTraffic.bytesSent+=4; | (_,32,_) => mouth.Write(acc);SynqTraffic.bytesSent+=4; gogogo(data,0,0); | (true::tail,_,_) => gogogo(tail,cnt+1, acc%| (1< gogogo(tail,cnt+1, acc ); } gogogo(data,0,0); } public Write(data:int, bytes:int,mouth:BinaryWriter):void{ unchecked{ for(mutable i=0;i>(8*i)):>byte ); } SynqTraffic.bytesSent+=bytes; } public Read(bytes : int,ear:BinaryReader):int{ SynqTraffic.bytesReceived+=bytes; mutable res=0; unchecked{ for(mutable i=0;iint)<<(8*i)); } res; } public NonTrivialFragments(fragments : list[Frag]):list[Frag]{ List.Filter( fragments, fun(_,size ){ size > explicitBytes } ); } public this(){ port=1337; weakHashBytes=3; strongHashBytes=3; verifyHashBytes=4; weakHashMask=(1<<(8*SynqDetails.weakHashBytes))-1; explicitBytes=(weakHashBytes+strongHashBytes)*2+verifyHashBytes; } } interface FragmentDescription{ Write(mouth:BinaryWriter):void; } class ExplicitBytes:FragmentDescription{ public data:array[byte]; public Write(mouth:BinaryWriter):void{ SynqTraffic.bytesSent+=data.Length; mouth.Write(data,0,data.Length); } public this(eye: FileStream, fragment : Frag){ def (offset,size)=fragment; data=array(size); ignore(eye.Seek(offset:>long,SeekOrigin.Begin)); ignore(eye.Read(data,0,size)); } public this(ear:BinaryReader,size : int){ data=array(size); SynqTraffic.bytesReceived+=data.Length; ignore(ear.Read(data,0,size)); } } class SearchHash:FragmentDescription{ public weakHash : int; public strongHash : int; public Write(mouth : BinaryWriter):void{ SynqDetails.Write(weakHash,SynqDetails.weakHashBytes,mouth); SynqDetails.Write(strongHash,SynqDetails.strongHashBytes,mouth); } public static composeWeakHash(sum :int,gro :int):int{ unchecked{ (sum%^gro)%&(SynqDetails.weakHashMask); } } public static Strong(eye : FileStream, fragment : Frag):int{ def (offset,size)=fragment; def data = array(size); ignore(eye.Seek(offset:>long,SeekOrigin.Begin)); ignore(eye.Read(data,0,size)); Strong(data,0,size); } public static Strong(data:array[byte],offset:int,size:int):int{ def md5 = MD5CryptoServiceProvider(); def hash = md5.ComputeHash(data,offset,size); mutable result=0; for(mutable i=0;iint); } result; } public this(eye : FileStream, fragment : Frag){ strongHash=Strong(eye,fragment); def (offset,size)=fragment; ignore(eye.Seek(offset:>long,SeekOrigin.Begin)); def buffer=array(size); ignore(eye.Read(buffer,0,size)); mutable sum=0;// ...+w+y+z mutable gro=0;// ...+3*w+2*y+1*z for(mutable i=0;iint); gro+=sum; } } weakHash=composeWeakHash(sum,gro); } public this(ear : BinaryReader){ weakHash=SynqDetails.Read(SynqDetails.weakHashBytes,ear); strongHash=SynqDetails.Read(SynqDetails.strongHashBytes,ear); } } class VerifyHash{ public hash : int; public Write(mouth : BinaryWriter):void{ SynqDetails.Write(hash,SynqDetails.verifyHashBytes,mouth); } public this(data:array[byte],offset:int,size:int){ def md5 = MD5CryptoServiceProvider(); def hash = md5.ComputeHash(data,offset,size); mutable result=0; for(mutable i=8;i<8+SynqDetails.verifyHashBytes;++i){ result<<=8; result|=(hash[i]:>int); } } public this(eye : FileStream,fragment : Frag){ def (offset,size)=fragment; def data = array(size); ignore(eye.Seek(offset:>long,SeekOrigin.Begin)); ignore(eye.Read(data,0,size)); def md5 = MD5CryptoServiceProvider(); def hash = md5.ComputeHash(data); mutable result=0; for(mutable i=8;i<8+SynqDetails.verifyHashBytes;++i){ result<<=8; result|=(hash[i]:>int); } } public this(ear:BinaryReader){ hash=SynqDetails.Read(SynqDetails.verifyHashBytes,ear); } } class ClientPacket{ //represents fragments that was unfinished before sending this packet public fragments : list[Frag]; //exact data sent public mutable found : list[bool]; public mutable hashes : list[VerifyHash]; public bigger : int; public smaller : int; public Write(mouth : BinaryWriter):void{ SynqDetails.Write(found,mouth); List.Iter(hashes, fun(hash:VerifyHash){hash.Write(mouth)} ); } public ReadFound(fragments: list[Frag],bits:list[bool],ear:BinaryReader):list[bool]{ | (_::tail , bit::tailBits , _) => bit::ReadFound(tail,tailBits,ear); | (_::_ , [] , _) =>SynqTraffic.bytesReceived+=4;ReadFound(fragments,SynqDetails.ExplodeBits(ear.ReadInt32()),ear); | _ => [] } public ReadHashes(found: list[bool],ear:BinaryReader):list[VerifyHash]{ | (true::tail,_) => def hash=VerifyHash(ear); hash::ReadHashes(tail,ear); | (false::tail,_) => ReadHashes(tail,ear); | _ =>[] } public this(previous : ServerPacket,eye : FileStream,pen :FileStream){ fragments= previous.fragments; bigger=previous.bigger; smaller=previous.smaller; mutable fragmentsLeft=0; def copy(from:int ,to:int, cnt:int){ --fragmentsLeft; ignore(eye.Seek(from:>long,SeekOrigin.Begin)); ignore(pen.Seek(to:>long,SeekOrigin.Begin)); def buff=array(cnt); ignore(eye.Read(buff,0,cnt)); ignore(pen.Write(buff,0,cnt)); } def null2nil(x){ if(x==null) [] else x } def denyEverything(){ def deny(fragments){ | (_,size)::tailFragments when size > SynqDetails.explicitBytes => false::deny(tailFragments); | _::tailFragments => deny(tailFragments); | _ => []; } found=deny(fragments); hashes=[]; } def FindFragments(descriptions : list [FragmentDescription]){ //step 1. build finding info Map weakHash -> strongHash*offset def interestingB=Hashtable(); def interestingS=Hashtable(); def matched=Hashtable(); def identifyFragment(fragment,description : FragmentDescription){ | ((offset:int,size),_) => if( size > SynqDetails.explicitBytes ){ def hash=description:>SearchHash; ++fragmentsLeft; if (size==smaller) interestingS[hash.weakHash]= (hash.strongHash,offset ) ::null2nil(interestingS[hash.weakHash]); else interestingB[hash.weakHash]= (hash.strongHash,offset ) ::null2nil(interestingB[hash.weakHash]); }else{ def explicit=description:>ExplicitBytes; ignore(pen.Seek(offset:>long,SeekOrigin.Begin)); ignore(pen.Write(explicit.data,0,size)); } } List.Iter2(fragments,descriptions,identifyFragment); if((fragmentsLeft==0) || (smaller > (eye.Length:>int))){ denyEverything(); }else{ //step 2. keep both eyes open def rearEye=FileStream(eye.Name,FileMode.Open,FileAccess.Read); ignore(eye.Seek(0L,SeekOrigin.Begin)); ignore(rearEye.Seek(0L,SeekOrigin.Begin)); def initBuffer=array(smaller); ignore(eye.Read(initBuffer,0,smaller)); //step 3. startup hashes mutable sum=0;// ...+w+y+z mutable gro=0;// ...+3*w+2*y+1*z for(mutable i=0;iint); gro+=sum; } } //step 4. gogogo mutable pos=smaller; mutable occ=0; def sh=SearchHash.composeWeakHash(sum,gro); when(interestingS.ContainsKey(sh)){ def strong=SearchHash.Strong(eye,(0,smaller)); List.Iter(interestingS[sh],fun(h:int,o:int){ when(h==strong && !matched.ContainsKey(o)){ copy(0,o, smaller); matched[o]=VerifyHash(eye,(0,smaller))} }); ignore(eye.Seek(smaller:>long,SeekOrigin.Begin)); } def bufferSize=1000000; def cleaner=bigger; if(smallerint));pos+=bufferSize){ ignore(rearEye.Seek(occ:>long,SeekOrigin.Begin)); ignore(rearEye.Read(commonBuffer,0,bufferSize+smaller)); mutable i=smaller; mutable ri=0; unchecked{ while(riint); ++i; gro+=sum; def bh=SearchHash.composeWeakHash(sum,gro); when(interestingB.ContainsKey(bh)){ def strong=SearchHash.Strong(commonBuffer,ri,bigger); List.Iter(interestingB[bh],fun(h:int,o:int){when(h==strong && !matched.ContainsKey(o)){copy(occ,o,bigger); matched[o]=VerifyHash(commonBuffer,ri,bigger)} }); } def rc=(commonBuffer[ri]:>int); ++ri; sum-=rc; gro-=cleaner*rc; ++occ; def sh=SearchHash.composeWeakHash(sum,gro); when(interestingS.ContainsKey(sh)){ def strong=SearchHash.Strong(commonBuffer,ri,smaller); List.Iter(interestingS[sh],fun(h:int,o:int){ when(h==strong && !matched.ContainsKey(o)){ copy(occ,o, smaller); matched[o]=VerifyHash(commonBuffer,ri,smaller)} }); } } } } }else{ def frontBuffer=array(bufferSize); def rearBuffer=array(bufferSize); for(;(fragmentsLeft!=0) && ((pos+bufferSize)<(eye.Length:>int));pos+=bufferSize){ ignore(rearEye.Read(rearBuffer,0,bufferSize)); ignore(eye.Read(frontBuffer,0,bufferSize)); unchecked{ for(mutable i=0;iint); gro+=sum; def bh=SearchHash.composeWeakHash(sum,gro); when(interestingB.ContainsKey(bh)){ def oldpos=eye.Position; def strong=SearchHash.Strong(eye,(occ,bigger)); List.Iter(interestingB[bh],fun(h:int,o:int){when(h==strong && !matched.ContainsKey(o)){copy(occ,o,bigger); matched[o]=VerifyHash(eye,(occ,bigger))} }); ignore(eye.Seek(oldpos,SeekOrigin.Begin)); } def rc=(rearBuffer[i]:>int); sum-=rc; gro-=cleaner*rc; ++occ; def sh=SearchHash.composeWeakHash(sum,gro); when(interestingS.ContainsKey(sh)){ def oldpos=eye.Position; def strong=SearchHash.Strong(eye,(occ,smaller)); List.Iter(interestingS[sh],fun(h:int,o:int){ when(h==strong && !matched.ContainsKey(o)){ copy(occ,o, smaller); matched[o]=VerifyHash(eye,(occ,smaller))} }); ignore(eye.Seek(oldpos,SeekOrigin.Begin)); } } } } } ignore(rearEye.Seek(occ:>long,SeekOrigin.Begin)); ignore(eye.Seek(pos:>long,SeekOrigin.Begin)); unchecked{ for(;(fragmentsLeft!=0) && (pos<(eye.Length:>int));++pos){ sum+=eye.ReadByte(); gro+=sum; def bh=SearchHash.composeWeakHash(sum,gro); when(interestingB.ContainsKey(bh)){ def oldpos=eye.Position; def strong=SearchHash.Strong(eye,(occ,bigger)); List.Iter(interestingB[bh],fun(h:int,o:int){when(h==strong && !matched.ContainsKey(o)){copy(occ,o,bigger); matched[o]=VerifyHash(eye,(occ,bigger))} }); ignore(eye.Seek(oldpos,SeekOrigin.Begin)); } def rc=rearEye.ReadByte(); sum-=rc; gro-=cleaner*rc; ++occ; def sh=SearchHash.composeWeakHash(sum,gro); when(interestingS.ContainsKey(sh)){ def oldpos=eye.Position; def strong=SearchHash.Strong(eye,(occ,smaller)); List.Iter(interestingS[sh],fun(h:int,o:int){ when(h==strong && !matched.ContainsKey(o)){ copy(occ,o, smaller); matched[o]=VerifyHash(eye,(occ,smaller))} }); ignore(eye.Seek(oldpos,SeekOrigin.Begin)); } } } //step 5. generate lists def makeResults(fragments){ | (offset,_)::tailFragments => makeResults(tailFragments); if(matched.ContainsKey(offset)){ found=true::found; hashes=matched[offset]::hashes; }else{ found=false::found; } | _ => found=[]; hashes=[]; } makeResults(SynqDetails.NonTrivialFragments(fragments)); } } FindFragments(previous.descriptions); } public this(previous : ServerPacket, ear : BinaryReader){//used by server to understand incoming data from client fragments=previous.fragments; bigger=previous.bigger; smaller=previous.smaller; found = ReadFound(SynqDetails.NonTrivialFragments(fragments),[],ear); hashes = ReadHashes(found,ear); } } class ServerPacket{ //represents exact data sent: public acc : list[bool]; public descriptions : list[FragmentDescription]; //knowledge after constructing , just before sending public fragments : list[Frag]; public bigger : int; public smaller : int; public Write(mouth : BinaryWriter):void{ SynqDetails.Write(acc,mouth); List.Iter(descriptions,fun(description:FragmentDescription){description.Write(mouth)}); } public Verify(fragments: list[Frag], found : list[bool] ,hashes : list[VerifyHash],eye:FileStream):list[bool]{ | (_::tailFragments,false::tailFound,_,_) => Verify(tailFragments,tailFound,hashes,eye); | (fragment::tailFragments,true::tailFound,hash::tailHashes,_) => (VerifyHash(eye,fragment).hash==hash.hash)::Verify(tailFragments,tailFound,tailHashes,eye); | _ => [] } public static Filter(acc : list[bool], found : list[bool], fragments : list[Frag],res :list[Frag] ):list[Frag]{ | (_,false::tailFound,(offset,size)::tailFragments,_) => def firstSize=size>>1; def secondSize=size-firstSize; Filter(acc,tailFound,tailFragments,(offset+firstSize,secondSize)::(offset,firstSize)::res); | (false::tailAcc,_::tailFound,(offset,size)::tailFragments,_) => def firstSize=size>>1; def secondSize=size-firstSize; Filter(tailAcc,tailFound,tailFragments,(offset+firstSize,secondSize)::(offset,firstSize)::res); | (true::tailAcc,_::tailFound,_::tailFragments,_) => Filter(tailAcc,tailFound,tailFragments,res); | _ => List.Rev(res); } public ComputeHashes(fragments: list[Frag],eye : FileStream):list[FragmentDescription]{ List.Map(fragments, fun(fragment){ match(fragment){ | (_,size) when size > SynqDetails.explicitBytes => SearchHash(eye,fragment); | _ => ExplicitBytes(eye,fragment); } } ); } public ReadAcc(fragments:list[Frag],found:list[bool],bits:list[bool],ear:BinaryReader):list[bool]{ | (_::tailFragments,true::tailFound , bit::tailBits , _) => bit::ReadAcc(tailFragments,tailFound,tailBits,ear); | (_::tailFragments,false::tailFound , _ , _) => ReadAcc(tailFragments,tailFound,bits,ear); | (_::_,_, [] , _) =>SynqTraffic.bytesReceived+=4;ReadAcc(fragments,found,SynqDetails.ExplodeBits(ear.ReadInt32()),ear); | _ => [] } public ReadHashes(fragments : list[Frag], ear : BinaryReader):list[FragmentDescription]{ List.Map(fragments, fun(_,size:int){ if( size >SynqDetails.explicitBytes) SearchHash(ear); else ExplicitBytes(ear,size); } ); } public this(previous : ClientPacket,ear : BinaryReader){//CLIENT uses this def oldFragments=SynqDetails.NonTrivialFragments(previous.fragments); acc=ReadAcc(oldFragments,previous.found,[],ear); smaller=previous.smaller>>1; bigger=smaller+1; fragments=Filter(acc,previous.found,oldFragments,[]); descriptions=ReadHashes(fragments,ear); } public this(ear : BinaryReader,size : Int64){//CLIENT uses this acc=[]; smaller=(size:>int); bigger=smaller+1; fragments=[(0,smaller)]; descriptions=ReadHashes(fragments,ear); } public this(previous : ClientPacket,eye : FileStream){//SERVER uses this def oldFragments= SynqDetails.NonTrivialFragments(previous.fragments); acc=Verify(oldFragments,previous.found,previous.hashes,eye); smaller=previous.smaller>>1; bigger=smaller+1; fragments=Filter(acc,previous.found,oldFragments,[]); descriptions=ComputeHashes(fragments,eye); } public this(size : Int64, eye : FileStream){//SERVER uses this acc=[]; smaller=(size:>int); bigger=smaller+1; fragments=[(0,smaller)]; descriptions=ComputeHashes(fragments,eye); } } class SynqServer{ public Run(myPacket : ServerPacket, eye: FileStream, mouth : BinaryWriter, ear : BinaryReader):void{ myPacket.Write( mouth ); printf("\r[todo:%d, sent:%d, received:%d] ",List.FoldLeft( myPacket.fragments , 0 , fun( f:Frag,acc:int) { match(f){(_,s)=>acc+s} } ),SynqTraffic.bytesSent,SynqTraffic.bytesReceived); match(myPacket.fragments){ | [] => printf("\nSuccesfuly synchronized file\n"); | _ => Run(ServerPacket(ClientPacket(myPacket,ear),eye),eye,mouth,ear); } } public this(eye: FileStream, mouth : BinaryWriter, ear : BinaryReader ){ def totalSize = eye.Length; printf("File size of new version: %li bytes\n", totalSize); mouth.Write(totalSize); SynqTraffic.bytesSent+=8; Run(ServerPacket(totalSize,eye),eye,mouth,ear); } } class SynqClient{ public Run(myPacket : ClientPacket, eye: FileStream, pen : FileStream, mouth : BinaryWriter, ear : BinaryReader):void{ myPacket.Write(mouth); printf("\r[todo:%d, sent:%d, received:%d] ",List.FoldLeft( myPacket.fragments , 0 , fun( f:Frag,acc:int) { match(f){(_,s)=>acc+s} } ),SynqTraffic.bytesSent,SynqTraffic.bytesReceived); match(myPacket.fragments){ | [] => printf("\nSuccesfuly synchronized file\n"); | _ => Run(ClientPacket(ServerPacket(myPacket,ear), eye,pen),eye,pen,mouth,ear); } } public this(eye: FileStream, pen : FileStream, mouth : BinaryWriter, ear : BinaryReader){ def totalSize=ear.ReadInt64(); SynqTraffic.bytesReceived+=8; printf("File size :\nof old file:\t%li bytes\nof new file:\t%li bytes\n", eye.Length,totalSize); pen.SetLength(totalSize); Run(ClientPacket(ServerPacket(ear,totalSize),eye,pen),eye,pen,mouth,ear); } } module SynqApp{ private Serve(args:array[string]):void{ match(args.Length){ | 2 => def fileName=args[1]; try{ def file=FileStream(fileName,FileMode.Open,FileAccess.Read); def listener=TcpListener(SynqDetails.port); listener.Start(); printf("Waiting for incomming connection at port: %d",SynqDetails.port); def client=listener.AcceptTcpClient(); WriteLine("\rGot connection! "); ignore(SynqServer(file,BinaryWriter(client.GetStream()), BinaryReader(client.GetStream()) )); }catch{ | _ is FileNotFoundException => printf("File not found: %s\n",fileName); | e is SocketException => printf("Problem with connection: %s\n",e.Message); | e => printf("Problem: %s\n",e.Message); } | _ => ShowUsage(); } } private Download(args:array[string]):void{ match(args.Length){ | 3 => def fileName=args[1]; def hostName=args[2]; try{ def file=FileStream(fileName,FileMode.Open,FileAccess.Read); printf("Connecting to:%s",hostName); def client=TcpClient(); client.Connect(hostName,SynqDetails.port); WriteLine("\rGot connection! "); WriteLine("Downloaded version will be written to file "+ fileName+ ".new"); def newFile=FileStream(fileName+".new",FileMode.OpenOrCreate,FileAccess.Write); ignore(SynqClient(file,newFile,BinaryWriter(client.GetStream()), BinaryReader(client.GetStream()))); }catch{ | _ is FileNotFoundException => printf("File not found: %s\n",fileName); | e is SocketException => printf("Problem with connection to %s: %s\n",hostName,e.Message); | e => printf("Problem: %s\n",e.Message); } | _ => ShowUsage(); } } private ShowUsage():void{ WriteLine("Usage:\nsynq.exe -s filename\n\tto serve file\nsynq.exe -d filename hostname\n\tto download file from host"); } public Main(args: array[string]):void{ if(args.Length==0){ ShowUsage(); }else{ match(args[0]){ | "-s" => Serve(args); | "-d" => Download(args); | _ => ShowUsage(); } } } } }