22
33//! Leases controlling write access to an archive.
44
5+ use std:: process;
56use std:: time:: Duration ;
67
78use serde:: { Deserialize , Serialize } ;
89use thiserror:: Error ;
910use time:: OffsetDateTime ;
10- use tracing:: { debug, instrument} ;
11+ use tracing:: { debug, instrument, trace , warn } ;
1112use url:: Url ;
1213
13- use crate :: jsonio:: { self , read_json} ;
1414use crate :: transport:: { self , Transport , WriteMode } ;
1515
16- pub static LEASE_FILENAME : & str = "LEASE.json " ;
16+ pub static LEASE_FILENAME : & str = "LEASE" ;
1717
1818/// A lease on an archive.
1919#[ derive( Debug ) ]
@@ -33,6 +33,9 @@ pub enum Error {
3333 content : Box < LeaseContent > ,
3434 } ,
3535
36+ #[ error( "Existing lease file {url} is corrupt" ) ]
37+ Corrupt { url : Url } ,
38+
3639 #[ error( "Transport error on lease file: {source}" ) ]
3740 Transport {
3841 #[ from]
@@ -48,7 +51,7 @@ type Result<T> = std::result::Result<T, Error>;
4851impl Lease {
4952 /// Acquire a lease, if one is available.
5053 ///
51- /// Returns [Error::Busy] if the lease is already held by another process.
54+ /// Returns [Error::Busy] or [Error::Corrupt] if the lease is already held by another process.
5255 #[ instrument]
5356 pub fn acquire ( transport : & Transport ) -> Result < Self > {
5457 let lease_taken = OffsetDateTime :: now_utc ( ) ;
@@ -57,9 +60,10 @@ impl Lease {
5760 host : hostname:: get ( )
5861 . unwrap_or_default ( )
5962 . to_string_lossy ( )
60- . into_owned ( ) ,
61- pid : std:: process:: id ( ) ,
62- client_version : crate :: VERSION . to_string ( ) ,
63+ . into_owned ( )
64+ . into ( ) ,
65+ pid : Some ( process:: id ( ) ) ,
66+ client_version : Some ( crate :: VERSION . to_string ( ) ) ,
6367 lease_taken,
6468 lease_expiry,
6569 } ;
@@ -69,14 +73,17 @@ impl Lease {
6973 while let Err ( err) = transport. write ( LEASE_FILENAME , s. as_bytes ( ) , WriteMode :: CreateNew ) {
7074 if err. kind ( ) == transport:: ErrorKind :: AlreadyExists {
7175 match Lease :: peek ( transport) ? {
72- Some ( content) => {
76+ LeaseState :: Held ( content) => {
7377 return Err ( Error :: Busy {
7478 url,
7579 content : Box :: new ( content) ,
7680 } )
7781 }
78- None => {
79- debug ! ( "Lease file disappeared after conflict" ) ;
82+ LeaseState :: Corrupt ( _mtime) => {
83+ return Err ( Error :: Corrupt { url } ) ;
84+ }
85+ LeaseState :: Free => {
86+ debug ! ( "Lease file disappeared after conflict; retrying" ) ;
8087 continue ;
8188 }
8289 }
@@ -101,26 +108,47 @@ impl Lease {
101108 }
102109
103110 /// Return information about the current leaseholder, if any.
104- pub fn peek ( transport : & Transport ) -> Result < Option < LeaseContent > > {
105- read_json ( transport, LEASE_FILENAME ) . map_err ( |err| match err {
106- jsonio:: Error :: Transport { source, .. } => Error :: Transport { source } ,
107- jsonio:: Error :: Json { source, .. } => Error :: Json {
108- source,
109- url : transport. relative_file_url ( LEASE_FILENAME ) ,
110- } ,
111- } )
111+ pub fn peek ( transport : & Transport ) -> Result < LeaseState > {
112+ // TODO: Atomically get the content and mtime; that should be one call on s3.
113+ let metadata = match transport. metadata ( LEASE_FILENAME ) {
114+ Ok ( m) => m,
115+ Err ( err) if err. is_not_found ( ) => {
116+ trace ! ( "lease file not present" ) ;
117+ return Ok ( LeaseState :: Free ) ;
118+ }
119+ Err ( err) => {
120+ warn ! ( ?err, "error getting lease file metadata" ) ;
121+ return Err ( err. into ( ) ) ;
122+ }
123+ } ;
124+ let bytes = transport. read ( LEASE_FILENAME ) ?;
125+ match serde_json:: from_slice ( & bytes) {
126+ Ok ( content) => Ok ( LeaseState :: Held ( content) ) ,
127+ Err ( err) => {
128+ warn ! ( ?err, "error deserializing lease file" ) ;
129+ // We do still at least know that it's held, and when it was taken.
130+ Ok ( LeaseState :: Corrupt ( metadata. modified ) )
131+ }
132+ }
112133 }
113134}
114135
136+ #[ derive( Debug , Clone ) ]
137+ pub enum LeaseState {
138+ Free ,
139+ Held ( LeaseContent ) ,
140+ Corrupt ( OffsetDateTime ) ,
141+ }
142+
115143/// Contents of the lease file.
116- #[ derive( Debug , Serialize , Deserialize ) ]
144+ #[ derive( Debug , Serialize , Deserialize , Clone ) ]
117145pub struct LeaseContent {
118146 /// Hostname of the client process
119- pub host : String ,
147+ pub host : Option < String > ,
120148 /// Process id of the client.
121- pub pid : u32 ,
149+ pub pid : Option < u32 > ,
122150 /// Conserve version string.
123- pub client_version : String ,
151+ pub client_version : Option < String > ,
124152
125153 /// Time when the lease was taken.
126154 #[ serde( with = "time::serde::iso8601" ) ]
@@ -133,6 +161,9 @@ pub struct LeaseContent {
133161
134162#[ cfg( test) ]
135163mod test {
164+ use std:: fs:: { write, File } ;
165+ use std:: process;
166+
136167 use tempfile:: TempDir ;
137168
138169 use super :: * ;
@@ -142,14 +173,71 @@ mod test {
142173 let tmp = TempDir :: new ( ) . unwrap ( ) ;
143174 let transport = & Transport :: local ( tmp. path ( ) ) ;
144175 let lease = Lease :: acquire ( transport) . unwrap ( ) ;
145- assert ! ( tmp. path( ) . join( "LEASE.json " ) . exists( ) ) ;
176+ assert ! ( tmp. path( ) . join( "LEASE" ) . exists( ) ) ;
146177 assert ! ( lease. next_renewal > lease. lease_taken) ;
147178
148- let peeked = Lease :: peek ( transport) . unwrap ( ) . unwrap ( ) ;
149- assert_eq ! ( peeked. host, hostname:: get( ) . unwrap( ) . to_string_lossy( ) ) ;
150- assert_eq ! ( peeked. pid, std:: process:: id( ) ) ;
179+ let peeked = Lease :: peek ( transport) . unwrap ( ) ;
180+ let LeaseState :: Held ( content) = peeked else {
181+ panic ! ( "lease not held" )
182+ } ;
183+ assert_eq ! (
184+ content. host. unwrap( ) ,
185+ hostname:: get( ) . unwrap( ) . to_string_lossy( )
186+ ) ;
187+ assert_eq ! ( content. pid, Some ( process:: id( ) ) ) ;
151188
152189 lease. release ( ) . unwrap ( ) ;
153- assert ! ( !tmp. path( ) . join( "LEASE.json" ) . exists( ) ) ;
190+ assert ! ( !tmp. path( ) . join( "LEASE" ) . exists( ) ) ;
191+ }
192+
193+ #[ test]
194+ fn peek_fixed_lease_content ( ) {
195+ let tmp = TempDir :: new ( ) . unwrap ( ) ;
196+ let transport = & Transport :: local ( tmp. path ( ) ) ;
197+ write (
198+ tmp. path ( ) . join ( "LEASE" ) ,
199+ r#"
200+ {
201+ "host": "somehost",
202+ "pid": 1234,
203+ "client_version": "0.1.2",
204+ "lease_taken": "2021-01-01T12:34:56Z",
205+ "lease_expiry": "2021-01-01T12:35:56Z"
206+ }"# ,
207+ )
208+ . unwrap ( ) ;
209+ let state = Lease :: peek ( transport) . unwrap ( ) ;
210+ dbg ! ( & state) ;
211+ match state {
212+ LeaseState :: Held ( content) => {
213+ assert_eq ! ( content. host. unwrap( ) , "somehost" ) ;
214+ assert_eq ! ( content. pid, Some ( 1234 ) ) ;
215+ assert_eq ! ( content. client_version. unwrap( ) , "0.1.2" ) ;
216+ assert_eq ! ( content. lease_taken. year( ) , 2021 ) ;
217+ assert_eq ! ( content. lease_expiry. year( ) , 2021 ) ;
218+ assert_eq ! (
219+ content. lease_expiry - content. lease_taken,
220+ time:: Duration :: seconds( 60 )
221+ ) ;
222+ }
223+ _ => panic ! ( "lease should be recognized as held, got {state:?}" ) ,
224+ }
225+ }
226+
227+ /// An empty lease file is judged by its mtime; the lease can be grabbed a while
228+ /// after it was last written.
229+ #[ test]
230+ fn peek_corrupt_empty_lease ( ) {
231+ let tmp = TempDir :: new ( ) . unwrap ( ) ;
232+ let transport = & Transport :: local ( tmp. path ( ) ) ;
233+ File :: create ( tmp. path ( ) . join ( "LEASE" ) ) . unwrap ( ) ;
234+ let state = Lease :: peek ( transport) . unwrap ( ) ;
235+ match state {
236+ LeaseState :: Corrupt ( mtime) => {
237+ let now = time:: OffsetDateTime :: now_utc ( ) ;
238+ assert ! ( now - mtime < time:: Duration :: seconds( 15 ) ) ;
239+ }
240+ _ => panic ! ( "lease should be recognized as corrupt, got {state:?}" ) ,
241+ }
154242 }
155243}
0 commit comments