Tue, 26 Mar 2019 17:30:34 +0100
adds file splitting feature to dav-sync push
bsd.mk | file | annotate | diff | comparison | revisions | |
clang.mk | file | annotate | diff | comparison | revisions | |
dav/db.c | file | annotate | diff | comparison | revisions | |
dav/db.h | file | annotate | diff | comparison | revisions | |
dav/scfg.c | file | annotate | diff | comparison | revisions | |
dav/scfg.h | file | annotate | diff | comparison | revisions | |
dav/sync.c | file | annotate | diff | comparison | revisions | |
dav/sync.h | file | annotate | diff | comparison | revisions | |
gcc.mk | file | annotate | diff | comparison | revisions | |
osx.mk | file | annotate | diff | comparison | revisions | |
suncc.mk | file | annotate | diff | comparison | revisions |
--- a/bsd.mk Sat Mar 23 10:04:18 2019 +0100 +++ b/bsd.mk Tue Mar 26 17:30:34 2019 +0100 @@ -43,5 +43,5 @@ APP_EXT = DAV_CFLAGS = `curl-config --cflags` `pkg-config --cflags libxml-2.0` -DAV_LDFLAGS = -L/usr/lib -lssl -lcrypto `curl-config --libs` `pkg-config --libs libxml-2.0` -lpthread +DAV_LDFLAGS = -L/usr/lib -lssl -lcrypto `curl-config --libs` `pkg-config --libs libxml-2.0` -lpthread -lm
--- a/clang.mk Sat Mar 23 10:04:18 2019 +0100 +++ b/clang.mk Tue Mar 26 17:30:34 2019 +0100 @@ -43,5 +43,5 @@ APP_EXT = DAV_CFLAGS = `curl-config --cflags` `pkg-config --cflags openssl libxml-2.0` -DAV_LDFLAGS = `curl-config --libs` `pkg-config --libs openssl libxml-2.0` -lpthread +DAV_LDFLAGS = `curl-config --libs` `pkg-config --libs openssl libxml-2.0` -lpthread -lm
--- a/dav/db.c Sat Mar 23 10:04:18 2019 +0100 +++ b/dav/db.c Tue Mar 26 17:30:34 2019 +0100 @@ -33,6 +33,8 @@ #include "db.h" +#include <ucx/utils.h> + #include <libidav/utils.h> #include <libxml/encoding.h> @@ -106,6 +108,78 @@ } } +void process_parts(xmlTextReaderPtr reader, LocalResource *res) { + UcxList *parts = NULL; + + FilePart *current_part = NULL; + + size_t count = 0; + int field = -1; + int err = 0; + while(xmlTextReaderRead(reader)) { + int type = xmlTextReaderNodeType(reader); + const xmlChar *name = xmlTextReaderConstName(reader); + int depth = xmlTextReaderDepth(reader); + + int part = TRUE; + if(type == XML_READER_TYPE_ELEMENT) { + if(depth == 3 && xstreq(name, "part")) { + current_part = calloc(1, sizeof(FilePart)); + current_part->block = count; + parts = ucx_list_append(parts, current_part); + count++; + } else if(depth == 4) { + if(xstreq(name, "hash")) { + field = 0; + } else if(xstreq(name, "etag")) { + field = 1; + } + } + } else if(type == XML_READER_TYPE_END_ELEMENT) { + if(depth == 2) { + // </parts> + break; + } else if(depth == 3) { + if(current_part) { + if(!current_part->hash || !current_part->etag) { + err = 1; + } + } + // </part> + current_part = NULL; + } + field = -1; + } else if(type == XML_READER_TYPE_TEXT && depth == 5 && current_part) { + const char *text = (const char*)xmlTextReaderConstValue(reader); + if(field == 0) { + current_part->hash = strdup(text); + } else if(field = 1) { + current_part->etag = strdup(text); + } + } + } + + if(err) { + ucx_list_free_content(parts, (ucx_destructor)filepart_free); + ucx_list_free(parts); + return; + } + + FilePart *file_parts = calloc(count, sizeof(FilePart)); + size_t i = 0; + UCX_FOREACH(elm, parts) { + FilePart *p = elm->data; + file_parts[i] = *p; + free(p); + i++; + } + + ucx_list_free(parts); + + res->parts = file_parts; + res->numparts = count; +} + LocalResource* process_resource(xmlTextReaderPtr reader) { LocalResource *res = calloc(1, sizeof(LocalResource)); @@ -141,6 +215,8 @@ res->skipped = TRUE; } else if(xstreq(name, "tags-updated")) { res->tags_updated = TRUE; + } else if(xstreq(name, "parts")) { + process_parts(reader, res); } } else if(type == XML_READER_TYPE_TEXT) { const xmlChar *value = xmlTextReaderConstValue(reader); @@ -441,6 +517,47 @@ } } + if(res->numparts > 0) { + r = xmlTextWriterStartElement(writer, BAD_CAST "parts"); + if(r < 0) { + xmlFreeTextWriter(writer); + return -1; + } + for(size_t i=0;i<res->numparts;i++) { + FilePart p = res->parts[i]; + r = xmlTextWriterStartElement(writer, BAD_CAST "part"); + if(r < 0) { + xmlFreeTextWriter(writer); + return -1; + } + + if(p.hash) { + r = xmlTextWriterWriteElement(writer, BAD_CAST "hash", BAD_CAST p.hash); + if(r < 0) { + xmlFreeTextWriter(writer); + return -1; + } + } + if(p.etag) { + r = xmlTextWriterWriteElement(writer, BAD_CAST "etag", BAD_CAST p.etag); + if(r < 0) { + xmlFreeTextWriter(writer); + return -1; + } + } + r = xmlTextWriterEndElement(writer); + if(r < 0) { + xmlFreeTextWriter(writer); + return -1; + } + } + r = xmlTextWriterEndElement(writer); + if(r < 0) { + xmlFreeTextWriter(writer); + return -1; + } + } + // </resource> xmlTextWriterEndElement(writer); } @@ -517,3 +634,13 @@ } free(res); } + +void filepart_free(FilePart *part) { + if(part->etag) { + free(part->etag); + } + if(part->hash) { + free(part->hash); + } + free(part); +}
--- a/dav/db.h Sat Mar 23 10:04:18 2019 +0100 +++ b/dav/db.h Tue Mar 26 17:30:34 2019 +0100 @@ -46,7 +46,8 @@ #define DB_STORE_MODE 1 #define DB_STORE_OWNER 2 -typedef struct LocalResource LocalResource; +typedef struct LocalResource LocalResource; +typedef struct FilePart FilePart; typedef struct SyncDatabase SyncDatabase; struct LocalResource { @@ -66,6 +67,9 @@ char *xattr_hash; char *remote_tags_hash; + FilePart *parts; + size_t numparts; + int64_t blocksize; DavBool tags_updated; @@ -77,6 +81,12 @@ DavBool restore; }; +struct FilePart { + uint64_t block; + char *hash; + char *etag; +}; + struct SyncDatabase { UcxMap *resources; UcxMap *conflict; @@ -88,6 +98,8 @@ void local_resource_free(LocalResource *res); +void filepart_free(FilePart *part); + LocalResource* process_resource(xmlTextReaderPtr reader); LocalResource* process_conflict(xmlTextReaderPtr reader);
--- a/dav/scfg.c Sat Mar 23 10:04:18 2019 +0100 +++ b/dav/scfg.c Tue Mar 26 17:30:34 2019 +0100 @@ -300,7 +300,14 @@ } SplitConfig *sc = calloc(1, sizeof(SplitConfig)); - sc->pattern = pattern ? strdup(pattern) : NULL; + if(pattern) { + regex_t *regex = malloc(sizeof(regex_t)); + if (regcomp(regex, pattern, REG_EXTENDED|REG_NOSUB)) { + fprintf(stderr, "Invalid regular expression (%s)\n", pattern); + } else { + sc->pattern = regex; + } + } sc->minsize = minsz; sc->blocksize = (size_t)sz; return sc; @@ -522,6 +529,7 @@ dir->lockpush = lockpush; dir->lock_timeout = lock_timeout; dir->metadata = metadata; + dir->splitconfig = splitconfig; if(metadata & FINFO_MODE == FINFO_MODE) { dir->db_settings = DB_STORE_MODE; }
--- a/dav/scfg.h Sat Mar 23 10:04:18 2019 +0100 +++ b/dav/scfg.h Tue Mar 26 17:30:34 2019 +0100 @@ -80,7 +80,7 @@ /* * path regex pattern or NULL */ - char *pattern; + regex_t *pattern; /* * minimum file size for activating file splitting
--- a/dav/sync.c Sat Mar 23 10:04:18 2019 +0100 +++ b/dav/sync.c Tue Mar 26 17:30:34 2019 +0100 @@ -41,6 +41,8 @@ #include <ucx/properties.h> #include <dirent.h> +#include <math.h> + #include <libidav/webdav.h> #include <libidav/utils.h> #include <libidav/crypto.h> @@ -77,9 +79,10 @@ { DAV_NS, "status" }, { DAV_NS, "finfo" }, { DAV_NS, "tags" }, - { DAV_NS, "xattributes" } + { DAV_NS, "xattributes" }, + { DAV_NS, "split" } }; -static size_t numdefprops = 5; +static size_t numdefprops = 6; /* * strcmp version that works with NULL pointers @@ -1415,7 +1418,8 @@ error = 1; } } else { - if(cdt && remote_resource_is_changed(sn, dir, db, res, local_res)) { + int changed = remote_resource_is_changed(sn, dir, db, res, local_res); + if(cdt && changed) { printf("conflict: %s\n", local_res->path); local_res->last_modified = 0; local_res->skipped = TRUE; @@ -2009,6 +2013,15 @@ if(db_res->last_modified == res->last_modified && db_res->size == res->size) { return 0; } + + if(db_res->parts) { + // if the resource is splitted, move the part infos to the new + // LocalResource obj, because we need it later + res->parts = db_res->parts; + res->numparts = db_res->numparts; + db_res->parts = NULL; + db_res->numparts = 0; + } } else { res->tags_updated = 1; res->finfo_updated = 1; @@ -2026,11 +2039,12 @@ LocalResource *res) { DavPropName properties[] = { - {"DAV:","getetag"}, - {DAV_NS,"tags"}, - {DAV_NS,VERSION_PATH_PROPERTY} + {"DAV:", "getetag"}, + {DAV_NS, "tags"}, + {DAV_NS, "version-collection"}, + {DAV_NS, "split" } }; - int err = dav_load_prop(remote, properties, 3); + int err = dav_load_prop(remote, properties, 4); if(res->restore) { return 0; @@ -2058,6 +2072,55 @@ return ret; } +size_t resource_get_blocksize(SyncDirectory *dir, LocalResource *local, DavResource *res, off_t filesize) { + size_t local_blocksize = 0; + if(local->blocksize < 0) { + // file splitting disabled + return 0; + } else if(local->blocksize > 0) { + local_blocksize = (size_t)local->blocksize; + } else { + UCX_FOREACH(elm, dir->splitconfig) { + SplitConfig *sc = elm->data; + if(sc->pattern) { + if(regexec(sc->pattern, local->path, 0, NULL, 0) != 0) { + continue; + } + } + + if(sc->minsize > 0) { + if(filesize < sc->minsize) { + continue; + } + } + + local_blocksize = sc->blocksize; + break; + } + } + + size_t svr_blocksize = 0; + char *svr_blocksize_str = dav_get_string_property_ns(res, DAV_NS, "split"); + if(svr_blocksize_str) { + uint64_t i = 0; + if(util_strtouint(svr_blocksize_str, &i)) { + svr_blocksize = (size_t)i; + } + } + + if(local_blocksize > 0 && svr_blocksize > 0) { + fprintf(stderr, "Warning: Blocksize mismatch: %s: local: %zu server: %zu\n", local->path, local_blocksize, svr_blocksize); + return svr_blocksize; + } else if(local_blocksize > 0) { + return local_blocksize; + } else if(svr_blocksize > 0) { + return svr_blocksize; + } + + return 0; + +} + int resource_pathlen_cmp(LocalResource *res1, LocalResource *res2, void *n) { size_t s1 = strlen(res1->path); size_t s2 = strlen(res2->path); @@ -2603,6 +2666,238 @@ } } +// this macro is only a workaround for a netbeans bug +#define LOG10 log10 + +static UcxList* upload_parts( + LocalResource *local, + DavResource *res, + FILE *in, + uint64_t filesize, + size_t blocksize, + uint64_t *blockcount, + int *err) +{ + // Make sure the resource is a collection. If it was a normal + // resource until now, delete it and recreate it as collection + if(res->exists) { + if(!res->iscollection) { + if(dav_delete(res)) { + print_resource_error(res->session, res->path); + *err = 1; + return NULL; + } + res->exists = 0; + return upload_parts(local, res, in, filesize, blocksize, blockcount, err); + } + } else { + res->iscollection = 1; + if(dav_create(res)) { + print_resource_error(res->session, res->path); + *err = 1; + return NULL; + } + } + res->exists = 1; + + if(!res->href) { + // this should never happen, but just make sure it doesn't crash + fprintf(stderr, "href is NULL\n"); + *err = 1; + return NULL; + } + + char *buffer = malloc(blocksize); + if(!buffer) { + fprintf(stderr, "Out of memory\n"); + *err = 1; + return NULL; + } + + // calculate the maximal length of resource names + // names should have all the same length and contain the block number + int nblocks = filesize / blocksize; + int digits = LOG10((double)nblocks) + 1; + if(digits > 127) { + fprintf(stderr, "Too many parts\n"); + *err = 1; + return NULL; + } + + UcxMap *updated_parts_map = ucx_map_new((nblocks/2)+64); + + int blockindex = 0; + int uploaded_parts = 0; + size_t r; + + // temporarly disable name encryption, because we don't need it for + // part names + uint32_t session_flags = res->session->flags; + res->session->flags ^= DAV_SESSION_ENCRYPT_NAME; + + while((r = fread(buffer, 1, blocksize, in)) > 0) { + int upload_block = 0; + char *block_hash = dav_create_hash(buffer, r); + if(blockindex >= local->numparts) { + // we don't have a hash for this block, therefore it must be new + upload_block = 1; + } else { + FilePart part = local->parts[blockindex]; + if(!strcmp(part.hash, block_hash)) { + // no change + free(block_hash); + block_hash = NULL; + } else { + // block has changed + upload_block = 1; + } + } + + if(upload_block) { + char name[128]; + snprintf(name, 128, "%0*d", digits, blockindex); + + char *part_href = util_concat_path(res->href, name); + DavResource *part = dav_resource_new_href(res->session, part_href); + free(part_href); + + // upload part + dav_set_content_data(part, buffer, r); + if(dav_store(part)) { + *err = 1; + print_resource_error(res->session, part->path); + } else { + // successfully uploaded part + + // store the FilePart in a map + // later we do a propfind and add the etag + FilePart *f = calloc(1, sizeof(FilePart)); + f->block = blockindex; + f->hash = block_hash; + ucx_map_cstr_put(updated_parts_map, name, f); + } + dav_resource_free(part); + uploaded_parts++; + } + if(*err) { + break; + } + blockindex++; + } + *blockcount = blockindex; + + // restore flags + res->session->flags = session_flags; + + free(buffer); + if(*err) { + ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free); + ucx_map_free(updated_parts_map); + return NULL; + } + + UcxList *updated_parts = NULL; + DavResource *parts = dav_query(res->session, "select D:getetag from %s order by name", res->path); + if(!parts) { + print_resource_error(res->session, parts->path); + *err = 1; + ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free); + ucx_map_free(updated_parts_map); + return NULL; + } + DavResource *part = parts->children; + while(part) { + FilePart *fp = ucx_map_cstr_remove(updated_parts_map, part->name); + // every part we uploaded is in the map + // if we get parts that are not in the map, someone else uploaded it + if(fp) { + char *etag = dav_get_string_property(part, "D:getetag"); + if(etag) { + if(strlen(etag) > 2 && etag[0] == 'W' && etag[1] == '/') { + etag = etag + 2; + } + + fp->etag = strdup(etag); + updated_parts = ucx_list_append(updated_parts, fp); + } // else { wtf is wrong with this resource } + } else { + uint64_t name_partnum = 0; + char *res_name = part->name; + while(res_name[0] == '0' && res_name[1] != '\0') { + res_name++; + } + DavBool delete_part = 0; + if(strlen(part->name) != digits) { + delete_part = 1; + } else if(util_strtouint(res_name, &name_partnum)) { + if(name_partnum >= blockindex) { + delete_part = 1; + } + } + + if(delete_part) { + if(dav_delete(part)) { + print_resource_error(part->session, part->path); + } + } + } + part = part->next; + } + dav_resource_free_all(parts); + + ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free); + ucx_map_free(updated_parts_map); + + *err = 0; + return updated_parts; +} + +static void update_parts(LocalResource *local, UcxList *updates, uint64_t numparts) { + size_t old_num = local->numparts; + if(old_num > numparts) { + // free old parts + for(size_t i=numparts;i<old_num;i++) { + FilePart p = local->parts[i]; + if(p.etag) { + free(p.etag); + } + if(p.hash) { + free(p.hash); + } + } + } + if(numparts != local->numparts) { + local->parts = realloc(local->parts, numparts * sizeof(FilePart)); + local->numparts = numparts; + } + + UCX_FOREACH(elm, updates) { + FilePart *p = elm->data; + if(p->block > numparts) { + // just make sure things don't explode in case some weird stuff + // is going on + continue; + } + + FilePart *old = &local->parts[p->block]; + if(p->block < old_num) { + // cleanup existing part + if(old->hash) { + free(old->hash); + old->hash = NULL; + } + if(old->etag) { + free(old->etag); + old->etag = NULL; + } + } + old->block = p->block; + old->hash = p->hash; + old->etag = p->etag; + free(p); + } +} + int sync_put_resource( SyncDirectory *dir, DavResource *res, @@ -2619,6 +2914,8 @@ return -1; } + size_t split_blocksize = resource_get_blocksize(dir, local, res, s.st_size); + FILE *in = sys_fopen(local_path, "rb"); if(!in) { fprintf(stderr, "Cannot open file %s\n", local_path); @@ -2626,8 +2923,29 @@ return -1; } - dav_set_content(res, in, (dav_read_func)myread, (dav_seek_func)file_seek); - dav_set_content_length(res, s.st_size); + DavBool issplit = split_blocksize == 0 ? FALSE : TRUE; + int split_err = 0; + UcxList *parts = NULL; + uint64_t blockcount = 0; + if(!issplit) { + // regular file upload + dav_set_content(res, in, (dav_read_func)myread, (dav_seek_func)file_seek); + dav_set_content_length(res, s.st_size); + } else { + // splitted/partial upload + parts = upload_parts( + local, + res, + in, + s.st_size, + split_blocksize, + &blockcount, + &split_err); + } + if(split_err) { + free(local_path); + return -1; + } MetadataHashes hashes; hashes = sync_set_metadata_properties(dir, res->session, res, local); @@ -2635,7 +2953,7 @@ // before sync_put_resource, remote_resource_is_changed does a propfind // and sets res->exists int exists = res->exists; - if(dir->versioning && dir->versioning->always) { + if(dir->versioning && dir->versioning->always && !issplit) { int err = versioning_begin(dir, res); if(err) { fprintf(stderr, "Cannot store version for resource: %s\n", res->href); @@ -2657,7 +2975,7 @@ break; } - if(dir->versioning && dir->versioning->always) { + if(dir->versioning && dir->versioning->always && !issplit) { if(versioning_end(dir, res)) { fprintf(stderr, "Cannot checkin resource\n"); ret = 1; @@ -2668,13 +2986,14 @@ (*counter)++; update_metadata_hashes(local, hashes); + update_parts(local, parts, blockcount); // check contentlength and get new etag - DavResource *up_res = dav_get(res->session, res->path, "D:getetag,idav:status,idav:tags"); + DavResource *up_res = dav_get(res->session, res->path, "D:getetag,idav:status"); if(up_res) { // the new content length must be equal or greater than the file size - if(up_res->contentlength < s.st_size) { + if(up_res->contentlength < s.st_size && !issplit) { fprintf(stderr, "Incomplete Upload: %s\n", local_path); ret = -1; // try to set the resource status to 'broken'
--- a/dav/sync.h Sat Mar 23 10:04:18 2019 +0100 +++ b/dav/sync.h Tue Mar 26 17:30:34 2019 +0100 @@ -131,6 +131,8 @@ DavResource *remote, LocalResource *res); +size_t resource_get_blocksize(SyncDirectory *dir, LocalResource *local, DavResource *res, off_t filesize); + int resource_pathlen_cmp(LocalResource *res1, LocalResource *res2, void *n); DavResource *versioning_simple_find(DavResource *res, const char *version);
--- a/gcc.mk Sat Mar 23 10:04:18 2019 +0100 +++ b/gcc.mk Tue Mar 26 17:30:34 2019 +0100 @@ -43,5 +43,5 @@ APP_EXT = DAV_CFLAGS = `curl-config --cflags` `pkg-config --cflags openssl libxml-2.0` -DAV_LDFLAGS = `curl-config --libs` `pkg-config --libs openssl libxml-2.0` -lpthread +DAV_LDFLAGS = `curl-config --libs` `pkg-config --libs openssl libxml-2.0` -lpthread -lm
--- a/osx.mk Sat Mar 23 10:04:18 2019 +0100 +++ b/osx.mk Tue Mar 26 17:30:34 2019 +0100 @@ -43,4 +43,4 @@ APP_EXT = DAV_CFLAGS = `xml2-config --cflags` -DAV_LDFLAGS = -framework CoreFoundation -lcurl -lxml2 -lpthread +DAV_LDFLAGS = -framework CoreFoundation -lcurl -lxml2 -lpthread -lm
--- a/suncc.mk Sat Mar 23 10:04:18 2019 +0100 +++ b/suncc.mk Tue Mar 26 17:30:34 2019 +0100 @@ -43,5 +43,5 @@ APP_EXT = DAV_CFLAGS = `curl-config --cflags` `pkg-config --cflags openssl libxml-2.0` -DAV_LDFLAGS = `pkg-config --libs openssl libxml-2.0` `curl-config --libs` -lpthread +DAV_LDFLAGS = `pkg-config --libs openssl libxml-2.0` `curl-config --libs` -lpthread -lm