dav/sync.c

changeset 536
877f7c4a203b
parent 533
5b9f20aa88c2
child 539
8deb52292c99
--- a/dav/sync.c	Sat Mar 23 10:04:18 2019 +0100
+++ b/dav/sync.c	Tue Mar 26 17:30:34 2019 +0100
@@ -41,6 +41,8 @@
 #include <ucx/properties.h>
 #include <dirent.h>
 
+#include <math.h>
+
 #include <libidav/webdav.h>
 #include <libidav/utils.h>
 #include <libidav/crypto.h>
@@ -77,9 +79,10 @@
     { DAV_NS, "status" },
     { DAV_NS, "finfo" },
     { DAV_NS, "tags" },
-    { DAV_NS, "xattributes" }
+    { DAV_NS, "xattributes" },
+    { DAV_NS, "split" }
 };
-static size_t numdefprops = 5;
+static size_t numdefprops = 6;
 
 /*
  * strcmp version that works with NULL pointers
@@ -1415,7 +1418,8 @@
                 error = 1;
             }
         } else {
-            if(cdt && remote_resource_is_changed(sn, dir, db, res, local_res)) {
+            int changed = remote_resource_is_changed(sn, dir, db, res, local_res);
+            if(cdt && changed) {
                 printf("conflict: %s\n", local_res->path);
                 local_res->last_modified = 0;
                 local_res->skipped = TRUE;
@@ -2009,6 +2013,15 @@
         if(db_res->last_modified == res->last_modified && db_res->size == res->size) {
             return 0;
         }
+        
+        if(db_res->parts) {
+            // if the resource is splitted, move the part infos to the new
+            // LocalResource obj, because we need it later
+            res->parts = db_res->parts;
+            res->numparts = db_res->numparts;
+            db_res->parts = NULL;
+            db_res->numparts = 0;
+        }
     } else {
         res->tags_updated = 1;
         res->finfo_updated = 1;
@@ -2026,11 +2039,12 @@
         LocalResource *res)
 {
     DavPropName properties[] = {
-        {"DAV:","getetag"},
-        {DAV_NS,"tags"},
-        {DAV_NS,VERSION_PATH_PROPERTY}
+        {"DAV:", "getetag"},
+        {DAV_NS, "tags"},
+        {DAV_NS, "version-collection"},
+        {DAV_NS, "split" }
     };
-    int err = dav_load_prop(remote, properties, 3);
+    int err = dav_load_prop(remote, properties, 4);
     
     if(res->restore) {
         return 0;
@@ -2058,6 +2072,55 @@
     return ret;
 }
 
+size_t resource_get_blocksize(SyncDirectory *dir, LocalResource *local, DavResource *res, off_t filesize) {
+    size_t local_blocksize = 0;
+    if(local->blocksize < 0) {
+        // file splitting disabled
+        return 0;
+    } else if(local->blocksize > 0) {
+        local_blocksize = (size_t)local->blocksize;
+    } else {
+        UCX_FOREACH(elm, dir->splitconfig) {
+            SplitConfig *sc = elm->data;
+            if(sc->pattern) {
+                if(regexec(sc->pattern, local->path, 0, NULL, 0) != 0) {
+                    continue;
+                }
+            }
+            
+            if(sc->minsize > 0) {
+                if(filesize < sc->minsize) {
+                    continue;
+                }
+            }
+            
+            local_blocksize = sc->blocksize;
+            break;
+        }
+    }
+    
+    size_t svr_blocksize = 0;
+    char *svr_blocksize_str = dav_get_string_property_ns(res, DAV_NS, "split");
+    if(svr_blocksize_str) {
+        uint64_t i = 0;
+        if(util_strtouint(svr_blocksize_str, &i)) {
+            svr_blocksize = (size_t)i;
+        }
+    }
+    
+    if(local_blocksize > 0 && svr_blocksize > 0) {
+        fprintf(stderr, "Warning: Blocksize mismatch: %s: local: %zu server: %zu\n", local->path, local_blocksize, svr_blocksize);
+        return svr_blocksize;
+    } else if(local_blocksize > 0) {
+        return local_blocksize;
+    } else if(svr_blocksize > 0) {
+        return svr_blocksize;
+    }
+    
+    return 0;
+    
+}
+
 int resource_pathlen_cmp(LocalResource *res1, LocalResource *res2, void *n) {
     size_t s1 = strlen(res1->path);
     size_t s2 = strlen(res2->path);
@@ -2603,6 +2666,238 @@
     }
 }
 
+// this macro is only a workaround for a netbeans bug    
+#define LOG10 log10
+
+static UcxList* upload_parts(
+        LocalResource *local,
+        DavResource *res,
+        FILE *in,
+        uint64_t filesize,
+        size_t blocksize,
+        uint64_t *blockcount,
+        int *err)
+{ 
+    // Make sure the resource is a collection. If it was a normal
+    // resource until now, delete it and recreate it as collection
+    if(res->exists) {
+        if(!res->iscollection) {
+            if(dav_delete(res)) {
+                print_resource_error(res->session, res->path);
+                *err = 1;
+                return NULL;
+            }
+            res->exists = 0;
+            return upload_parts(local, res, in, filesize, blocksize, blockcount, err);
+        }
+    } else {
+        res->iscollection = 1;
+        if(dav_create(res)) {
+            print_resource_error(res->session, res->path);
+            *err = 1;
+            return NULL;
+        }
+    }
+    res->exists = 1;
+    
+    if(!res->href) {
+        // this should never happen, but just make sure it doesn't crash
+        fprintf(stderr, "href is NULL\n");
+        *err = 1;
+        return NULL;
+    }
+    
+    char *buffer = malloc(blocksize);
+    if(!buffer) {
+        fprintf(stderr, "Out of memory\n");
+        *err = 1;
+        return NULL;
+    }
+    
+    // calculate the maximal length of resource names
+    // names should have all the same length and contain the block number
+    int nblocks = filesize / blocksize;
+    int digits = LOG10((double)nblocks) + 1;
+    if(digits > 127) {
+        fprintf(stderr, "Too many parts\n");
+        *err = 1;
+        return NULL;
+    }
+    
+    UcxMap *updated_parts_map = ucx_map_new((nblocks/2)+64);
+    
+    int blockindex = 0;
+    int uploaded_parts = 0;
+    size_t r;
+    
+    // temporarly disable name encryption, because we don't need it for
+    // part names
+    uint32_t session_flags = res->session->flags;
+    res->session->flags ^= DAV_SESSION_ENCRYPT_NAME;
+    
+    while((r = fread(buffer, 1, blocksize, in)) > 0) {
+        int upload_block = 0;
+        char *block_hash = dav_create_hash(buffer, r);
+        if(blockindex >= local->numparts) {
+            // we don't have a hash for this block, therefore it must be new
+            upload_block = 1;
+        } else {
+            FilePart part = local->parts[blockindex];
+            if(!strcmp(part.hash, block_hash)) {
+                // no change
+                free(block_hash);
+                block_hash = NULL;
+            } else {
+                // block has changed
+                upload_block = 1;
+            }
+        }
+        
+        if(upload_block) {
+            char name[128];
+            snprintf(name, 128, "%0*d", digits, blockindex);
+            
+            char *part_href = util_concat_path(res->href, name);
+            DavResource *part = dav_resource_new_href(res->session, part_href);
+            free(part_href);
+            
+            // upload part
+            dav_set_content_data(part, buffer, r);
+            if(dav_store(part)) {
+                *err = 1;
+                print_resource_error(res->session, part->path);
+            } else {
+                // successfully uploaded part
+                
+                // store the FilePart in a map
+                // later we do a propfind and add the etag
+                FilePart *f = calloc(1, sizeof(FilePart));
+                f->block = blockindex;
+                f->hash = block_hash;
+                ucx_map_cstr_put(updated_parts_map, name, f);
+            }
+            dav_resource_free(part);
+            uploaded_parts++;
+        }
+        if(*err) {
+            break;
+        }
+        blockindex++;        
+    }
+    *blockcount = blockindex;
+    
+    // restore flags
+    res->session->flags = session_flags;
+    
+    free(buffer);
+    if(*err) {
+        ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free);
+        ucx_map_free(updated_parts_map);
+        return NULL;
+    }
+    
+    UcxList *updated_parts = NULL;
+    DavResource *parts = dav_query(res->session, "select D:getetag from %s order by name", res->path);
+    if(!parts) {
+        print_resource_error(res->session, parts->path);
+        *err = 1;
+        ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free);
+        ucx_map_free(updated_parts_map);
+        return NULL;
+    }
+    DavResource *part = parts->children;
+    while(part) {
+        FilePart *fp = ucx_map_cstr_remove(updated_parts_map, part->name);
+        // every part we uploaded is in the map
+        // if we get parts that are not in the map, someone else uploaded it
+        if(fp) {
+            char *etag = dav_get_string_property(part, "D:getetag");
+            if(etag) {
+                if(strlen(etag) > 2 && etag[0] == 'W' && etag[1] == '/') {
+                    etag = etag + 2;
+                }
+
+                fp->etag = strdup(etag);
+                updated_parts = ucx_list_append(updated_parts, fp);
+            } // else { wtf is wrong with this resource }
+        } else {
+            uint64_t name_partnum = 0;
+            char *res_name = part->name;
+            while(res_name[0] == '0' && res_name[1] != '\0') {
+                res_name++;
+            }
+            DavBool delete_part = 0;
+            if(strlen(part->name) != digits) {
+                delete_part = 1;
+            } else if(util_strtouint(res_name, &name_partnum)) {
+                if(name_partnum >= blockindex) {
+                    delete_part = 1;
+                }
+            }
+            
+            if(delete_part) {
+                if(dav_delete(part)) {
+                    print_resource_error(part->session, part->path);
+                }
+            }
+        }
+        part = part->next;
+    }
+    dav_resource_free_all(parts);
+        
+    ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free);
+    ucx_map_free(updated_parts_map);
+    
+    *err = 0;
+    return updated_parts;
+}
+
+static void update_parts(LocalResource *local, UcxList *updates, uint64_t numparts) {
+    size_t old_num = local->numparts;
+    if(old_num > numparts) {
+        // free old parts
+        for(size_t i=numparts;i<old_num;i++) {
+            FilePart p = local->parts[i];
+            if(p.etag) {
+                free(p.etag);
+            }
+            if(p.hash) {
+                free(p.hash);
+            }
+        }
+    }
+    if(numparts != local->numparts) {
+        local->parts = realloc(local->parts, numparts * sizeof(FilePart));
+        local->numparts = numparts;
+    }
+    
+    UCX_FOREACH(elm, updates) {
+        FilePart *p = elm->data;
+        if(p->block > numparts) {
+            // just make sure things don't explode in case some weird stuff
+            // is going on
+            continue;
+        }
+        
+        FilePart *old = &local->parts[p->block];
+        if(p->block < old_num) {
+            // cleanup existing part
+            if(old->hash) {
+                free(old->hash);
+                old->hash = NULL;
+            }
+            if(old->etag) {
+                free(old->etag);
+                old->etag = NULL;
+            }
+        }
+        old->block = p->block;
+        old->hash = p->hash;
+        old->etag = p->etag;
+        free(p);
+    }
+}
+
 int sync_put_resource(
         SyncDirectory *dir,
         DavResource *res,
@@ -2619,6 +2914,8 @@
         return -1;
     }
     
+    size_t split_blocksize = resource_get_blocksize(dir, local, res, s.st_size);
+    
     FILE *in = sys_fopen(local_path, "rb");
     if(!in) {
         fprintf(stderr, "Cannot open file %s\n", local_path);
@@ -2626,8 +2923,29 @@
         return -1;
     }
     
-    dav_set_content(res, in, (dav_read_func)myread, (dav_seek_func)file_seek);
-    dav_set_content_length(res, s.st_size);
+    DavBool issplit = split_blocksize == 0 ? FALSE : TRUE;
+    int split_err = 0;
+    UcxList *parts = NULL;
+    uint64_t blockcount = 0;
+    if(!issplit) {
+        // regular file upload 
+        dav_set_content(res, in, (dav_read_func)myread, (dav_seek_func)file_seek);
+        dav_set_content_length(res, s.st_size);
+    } else {
+        // splitted/partial upload
+        parts = upload_parts(
+                local,
+                res,
+                in,
+                s.st_size,
+                split_blocksize,
+                &blockcount,
+                &split_err);
+    }
+    if(split_err) {
+        free(local_path);
+        return -1;
+    }
     
     MetadataHashes hashes;
     hashes = sync_set_metadata_properties(dir, res->session, res, local);
@@ -2635,7 +2953,7 @@
     // before sync_put_resource, remote_resource_is_changed does a propfind
     // and sets res->exists
     int exists = res->exists;
-    if(dir->versioning && dir->versioning->always) {
+    if(dir->versioning && dir->versioning->always && !issplit) {
         int err = versioning_begin(dir, res);
         if(err) {
             fprintf(stderr, "Cannot store version for resource: %s\n", res->href);
@@ -2657,7 +2975,7 @@
         break;
     }
     
-    if(dir->versioning && dir->versioning->always) {
+    if(dir->versioning && dir->versioning->always && !issplit) {
         if(versioning_end(dir, res)) {
             fprintf(stderr, "Cannot checkin resource\n");
             ret = 1;
@@ -2668,13 +2986,14 @@
         (*counter)++;
         
         update_metadata_hashes(local, hashes);
+        update_parts(local, parts, blockcount);
         
         // check contentlength and get new etag
-        DavResource *up_res = dav_get(res->session, res->path, "D:getetag,idav:status,idav:tags");
+        DavResource *up_res = dav_get(res->session, res->path, "D:getetag,idav:status");
         
         if(up_res) {
             // the new content length must be equal or greater than the file size
-            if(up_res->contentlength < s.st_size) {
+            if(up_res->contentlength < s.st_size && !issplit) {
                 fprintf(stderr, "Incomplete Upload: %s\n", local_path);
                 ret = -1;
                 // try to set the resource status to 'broken'

mercurial