Ed: solved with the help of the async_stream crate.

I’m struggling with the borrow checker!

My problem: I’m using actix-web and rusqlite. I want to return an unlimited number of records from an rusqlite query, and actix provides a Stream trait for that kind of thing. You just impl the trait and return your records from a poll_next() fn.

On the rusqlite side, there’s this query_map that returns an iterator of records from a query. All I have to do is smush these two features together.

So the plan is to put the iterator returned by query_map into a struct that impls Stream. Problem is the lifetime of a var used by query_map. How to make the var have the same lifetime as the iterator??

So here’s the code:

pub struct ZkNoteStream<'a, T> {
  rec_iter: Box<dyn Iterator<Item = T> + 'a>,
}

// impl of Stream just calls next() on the iterator.  This compiles fine.
impl<'a> Stream for ZkNoteStream<'a, serde_json::Value> {
  type Item = serde_json::Value;

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    Poll::Ready(self.rec_iter.next())
  }
}

// init function to set up the ZkNoteStream.
impl<'a> ZkNoteStream<'a, Result<ZkListNote, rusqlite::Error>> {
  pub fn init(
    conn: &'a Connection,
    user: i64,
    search: &ZkNoteSearch,
  ) -> Result<Self, Box<dyn Error>> {
    let (sql, args) = build_sql(&conn, user, search.clone())?;

    let sysid = user_id(&conn, "system")?;
    let mut pstmt = conn.prepare(sql.as_str())?;

    // Here's the problem!  Borrowing pstmt.
    let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
      let id = row.get(0)?;
      let sysids = get_sysids(&conn, sysid, id)?;
      Ok(ZkListNote {
        id: id,
        title: row.get(1)?,
        is_file: {
          let wat: Option<i64> = row.get(2)?;
          wat.is_some()
        },
        user: row.get(3)?,
        createdate: row.get(4)?,
        changeddate: row.get(5)?,
        sysids: sysids,
      })
    })?;

    Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
      rec_iter: Box::new(rec_iter),
    })
  }
}

And here’s the error:

error[E0515]: cannot return value referencing local variable `pstmt`
   --> server-lib/src/search.rs:170:5
    |
153 |       let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
    |                      ----- `pstmt` is borrowed here
...
170 | /     Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
171 | |       rec_iter: Box::new(rec_iter),
172 | |     })
    | |______^ returns a value referencing data owned by the current function

So basically it boils down to pstmt getting borrowed in the query_map call. It needs to have the same lifetime as the closure. How do I ensure that?

  • hallettj@beehaw.org
    link
    fedilink
    English
    arrow-up
    1
    ·
    edit-2
    7 months ago

    Well I’m curious to know what solutions there are to this. But I did reduce your problem to a smaller, self-contained example:

    struct ZKNoteStream&lt;'a, T> {
        rec_iter: Box + 'a>,
    }
    
    struct BorrowedThing(Vec);
    
    impl BorrowedThing {
        fn prepare(&amp;self) -> IterProducer&lt;'_> {
            IterProducer(&amp;self.0)
        }
    }
    
    struct IterProducer&lt;'a>(&amp;'a Vec);
    
    impl&lt;'a> IterProducer&lt;'a> {
        fn query_map(&amp;self) -> impl Iterator {
            self.0.into_iter()
        }
    }
    
    fn test(conn: &amp;BorrowedThing) -> ZKNoteStream&lt;'_, &amp;String> {
        let pstmt = conn.prepare();
        let rec_iter = pstmt.query_map();
    
        ZKNoteStream { // cannot return value referencing local variable `pstmt`
            rec_iter: Box::new(rec_iter),
        }
    }
    

    Edit: Wow, that code block came out mangled. Here is a Rust Playground link

    I ran into a similar problem yesterday, but it was a case where I had control over all the code. What I did was the equivalent of changing query_map so that instead of taking a reference to its receiver, it takes ownership.

    My thinking at the moment is that you may need to modify ZKNoteStream so that instead of containing the resulting iterator it contains pstmt, maybe paired with a function takes pstmt as an argument and returns the iterator. It seems like you should be able to create an FnOnce closure with a move keyword that moves ownership of pstmt into that closure, which you could then store in ZKNoteStream. But I’m not able to make that happen right now.

    • pr06lefs@lemmy.mlOP
      link
      fedilink
      arrow-up
      1
      ·
      7 months ago

      UPDATE!

      I sort of solved this part of it, or at least got it to compile. I’ve got a reddit post of this too! Someone there hinted that I should use another struct ‘above’ ZkNoteStream. I’m doing that in the code listing below.

      ZnsMaker has an init() fn, then you call make_stream() and it returns a ZkNoteStream. The intent is ZnsMaker should be managed so it lasts as long as the ZkNoteStream needs to last. All this bit compiles, great! But when I go to use it in my actix handler, I get a borrowing problem there instead. So I may have just kicked the can down the road.

      This part compiles. Wrong types still, should produce Bytes instead of ZkListNotes.

      pub struct ZkNoteStream&lt;'a, T> {
        rec_iter: Box + 'a>,
      }
      
      impl&lt;'a> Stream for ZkNoteStream&lt;'a, Result> {
        type Item = Result;
      
        fn poll_next(mut self: Pin&lt;&amp;mut Self>, cx: &amp;mut Context&lt;'_>) -> Poll> {
          Poll::Ready(self.rec_iter.next())
        }
      }
      
      pub struct ZnsMaker&lt;'a> {
        pstmt: rusqlite::Statement&lt;'a>,
        sysid: i64,
        args: Vec,
      }
      
      impl&lt;'a> ZnsMaker&lt;'a> {
        pub fn init(
          conn: &amp;'a Connection,
          user: i64,
          search: &amp;ZkNoteSearch,
        ) -> Result> {
          let (sql, args) = build_sql(&amp;conn, user, search.clone())?;
      
          let sysid = user_id(&amp;conn, "system")?;
      
          Ok(ZnsMaker {
            args: args,
            sysid: sysid,
            pstmt: conn.prepare(sql.as_str())?,
          })
        }
      
        pub fn make_stream(
          &amp;'a mut self,
          conn: &amp;'a Connection,  // have to pass the connection in here instead of storing in ZnsMaker, for Reasons.
        ) -> Result>, rusqlite::Error> {
          let sysid = self.sysid;
          let rec_iter =
            self
              .pstmt
              .query_map(rusqlite::params_from_iter(self.args.iter()), move |row| {
                let id = row.get(0)?;
                let sysids = get_sysids(&amp;conn, sysid, id)?;
                Ok(ZkListNote {
                  id: id,
                  title: row.get(1)?,
                  is_file: {
                    let wat: Option = row.get(2)?;
                    wat.is_some()
                  },
                  user: row.get(3)?,
                  createdate: row.get(4)?,
                  changeddate: row.get(5)?,
                  sysids: sysids,
                })
              })?;
      
          Ok(ZkNoteStream::&lt;'a, Result> {
            rec_iter: Box::new(rec_iter),
          })
        }
      }
      

      Ok and here’s the handler function where I receive a query and make the ZnsMaker. But if I create a ZkNoteStream with it, I get a borrowing error. Maybe it would be ok if I immediately consumed it in an HttpResponse::Ok().streaming(znsstream). Got to fix the types first though.

      pub async fn zk_interface_loggedin_streaming(
        config: &amp;Config,
        uid: i64,
        msg: &amp;UserMessage,
      ) -> Result> {
        match msg.what.as_str() {
          "searchzknotesstream" => {
            let msgdata = Option::ok_or(msg.data.as_ref(), "malformed json data")?;
            let search: ZkNoteSearch = serde_json::from_value(msgdata.clone())?;
            let conn = sqldata::connection_open(config.orgauth_config.db.as_path())?;
            let mut znsm = ZnsMaker::init(&amp;conn, uid, &amp;search)?;
            {
              // borrowed value of znsm doesn't live long enough!  wat do?
              let znsstream = &amp;znsm.make_stream(&amp;conn)?;
            }
            Err("wat".into())
          }
          wat => Err(format!("invalid 'what' code:'{}'", wat).into()),
        }
      }
      
      • hallettj@beehaw.org
        link
        fedilink
        English
        arrow-up
        1
        ·
        edit-2
        7 months ago

        I’m glad you found a workaround. I didn’t want to be defeated by the callback idea that I had yesterday so I worked on it some more and came up with a similar-but-different solution where ZnsMaker stores pstmt paired with a closure that borrows it. This is again a simplified version of your code that is self-contained:

        https://play.rust-lang.org/?version=stable&amp;mode=debug&amp;edition=2021&amp;gist=5bd6fb7c1cbf1c9c44c8f4bdbb1e8074

        The closure solution avoids the need to pass conn to make_stream. I don’t know if it would fix the new borrowing error that you got since I did not reproduce that error. But I think it might.

        Does znsstream need to outlive znsm? If so I think my solution solves the problem. Does znsstream need to outlive conn? That would be more complicated.

        Edit: Oh! You don’t need to put both the pstmt and a closure in ZnsMaker. Instead you can just store a closure if you move ownership of pstmt into the closure. That ensures that pstmt lives as long as the function that produces the iterator that you want:

        https://play.rust-lang.org/?version=stable&amp;mode=debug&amp;edition=2021&amp;gist=491258a7dc9bcad9dab08632d68c026b

        Edit: Lol you don’t need ZnsMaker after all. See my other top-level comment. I learned some things working on this, so time well spent IMO.

  • calcopiritus@lemmy.world
    link
    fedilink
    arrow-up
    1
    ·
    7 months ago

    I don’t think the problem is in the closure itself, since pstmt isn’t used inside the closure. To me, it looks like rec_iter has the lifetime of pstmt.

    pstmt is dropped at the end of the function. However, rec_iter isn’t dropped at the end of the function (it is returned). You should either pass pstmt from outside the function as a reference and have zknotestream a lifetime smaller than pstmt, or return pstmt along with rec_iter. Idk if the last option is detected by rust though.

    • pr06lefs@lemmy.mlOP
      link
      fedilink
      arrow-up
      1
      ·
      7 months ago

      The problem is that the return from the actix handler has to be HttpResult::Ok().stream(my_znsstream). Anything else in the function will get dropped before the HttpResult. So either the pstmt is somehow embedded in my_znsstream, or it doesn’t compile… : (

      • calcopiritus@lemmy.world
        link
        fedilink
        arrow-up
        1
        ·
        7 months ago

        Idk how actix works, but if there is no other way around it, you’ll have to store pstmt in some kind of global state. Maybe in a global Vec or HashMap that stores unfinished streams.

  • hallettj@beehaw.org
    link
    fedilink
    English
    arrow-up
    1
    ·
    7 months ago

    Ok I have a simple solution for you. What you need to do is to produce rec_iter in such a way that pstmt is dropped at the same time. You can do that by creating and calling pstmt in a block like this:

    let rec_iter = {
        let mut pstmt = conn.prepare(sql.as_str())?;
        pstmt.query_map(/* ... */)?
    };
    

    Here is a Rust Playground example

    • pr06lefs@lemmy.mlOP
      link
      fedilink
      arrow-up
      1
      ·
      edit-2
      7 months ago

      So close! The problem is that query_map doesn’t consume pstmt, it only references it.

      Error[E0515]: cannot return value referencing local variable `pstmt`
         --> server-lib/src/search.rs:191:5
          |
      162 |         let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
          |                        ----- `pstmt` is borrowed here
      ...
      191 | /     Ok(ZkNoteStream {
      192 | |       rec_iter: Box::new(bytes_iter),
      193 | |     })
          | |______^ returns a value referencing data owned by the current function
      

      (bytes_iter is rec_iter run through some map()s)

      • hallettj@beehaw.org
        link
        fedilink
        English
        arrow-up
        1
        ·
        7 months ago

        Did you try adding the block like I suggested? I reproduced the same error that you are seeing, and verified in my playground snippet that the block fixes the problem.

        It’s ok that query_map references pstmt. Isolating pstmt in a block causes it to be dropped at the end of the block while rec_iter continues to live. I think that might be thanks to temporary lifetime extension or something.

        • pr06lefs@lemmy.mlOP
          link
          fedilink
          arrow-up
          1
          ·
          edit-2
          7 months ago

          Yep, gave that a try (I think!). Here’s that version.

          pub struct ZkNoteStream&lt;'a> {
            rec_iter: Box + 'a>,
          }
          
          impl&lt;'a> ZkNoteStream&lt;'a> {
            pub fn init(conn: Connection, user: i64, search: &amp;ZkNoteSearch) -> Result> {
              let (sql, args) = build_sql(&amp;conn, user, search.clone())?;
          
              let sysid = user_id(&amp;conn, "system")?;
          
              let bytes_iter = {
                let mut pstmt = conn.prepare(sql.as_str())?;
                let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
                  let id = row.get(0)?;
                  Ok(ZkListNote {
                    id: id,
                    title: row.get(1)?,
                    is_file: {
                      let wat: Option = row.get(2)?;
                      wat.is_some()
                    },
                    user: row.get(3)?,
                    createdate: row.get(4)?,
                    changeddate: row.get(5)?,
                    sysids: Vec::new(),
                  })
                })?;
          
                let val_iter = rec_iter
                  .filter_map(|x| x.ok())
                  .map(|x| serde_json::to_value(x).map_err(|e| e.into()));
          
                val_iter
                  .filter_map(|x: Result| x.ok())
                  .map(|x| Bytes::from(x.to_string()))
              };
          
              Ok(ZkNoteStream {
                rec_iter: Box::new(bytes_iter),
              })
            }
          }
          
          impl&lt;'a> Stream for ZkNoteStream&lt;'a> {
            type Item = Bytes;
          
            fn poll_next(mut self: Pin&lt;&amp;mut Self>, cx: &amp;mut Context&lt;'_>) -> Poll> {
              Poll::Ready(self.rec_iter.next())
            }
          }
          

          This gets two errors, one for the conn and one for the pstmt:

          error[E0515]: cannot return value referencing local variable `pstmt`
             --> server-lib/src/search.rs:181:5
              |
          153 |         let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
              |                        ----- `pstmt` is borrowed here
          ...
          181 | /     Ok(ZkNoteStream {
          182 | |       rec_iter: Box::new(bytes_iter),
          183 | |     })
              | |______^ returns a value referencing data owned by the current function
          
          error[E0515]: cannot return value referencing function parameter `conn`
             --> server-lib/src/search.rs:181:5
              |
          152 |         let mut pstmt = conn.prepare(sql.as_str())?;
              |                         ---- `conn` is borrowed here
          ...
          181 | /     Ok(ZkNoteStream {
          182 | |       rec_iter: Box::new(bytes_iter),
          183 | |     })
              | |______^ returns a value referencing data owned by the current function
          
          • hallettj@beehaw.org
            link
            fedilink
            English
            arrow-up
            2
            ·
            7 months ago

            Oh I’m sorry! I messed up my test case so it only looked like the block fixed things.

            • pr06lefs@lemmy.mlOP
              link
              fedilink
              arrow-up
              2
              ·
              edit-2
              7 months ago

              I’m not quite ready to give up, but its not looking good. One of the rusqlite maintainers issued this haiku-like missive:

              yeah you just have to collect
              you can't return that as an iterator
              it needs to borrow from the statement
              

              Got to wondering how Vec does this interator-with-internal-state thing, and its with unsafe.

              update from the maintainer on discord:

              fundamentally you're asking for a self-referential type. e.g. one field borrows from another field of the same struct. cant  be done without  unsafe
              very easy to have soundness holes even if you use unsafe
              
                • pr06lefs@lemmy.mlOP
                  link
                  fedilink
                  arrow-up
                  2
                  ·
                  7 months ago

                  I’ve been looking into it a bit - not pinning per se but self referential. There’s a library called ouroboros that looks helpful. There’s even an example on github where someone uses rusqlite and ouroboros together.

                  So it seems like this should work:

                  #[self_referencing]
                  pub struct ZkNoteStream {
                    conn: Connection,
                    #[borrows(conn)]
                    pstmt: rusqlite::Statement&lt;'this>,
                    #[borrows(mut pstmt)]
                    #[covariant]
                    rec_iter: rusqlite::Rows&lt;'this>,
                  }
                  
                  impl ZkNoteStream {
                    pub fn init(conn: Connection, user: i64, search: &amp;ZkNoteSearch) -> Result> {
                      let (sql, args) = build_sql(&amp;conn, user, search.clone())?;
                  
                      Ok(
                        ZkNoteStreamTryBuilder {
                          conn: conn,
                          pstmt_builder: |conn: &amp;Connection| conn.prepare(sql.as_str()),
                          rec_iter_builder: |pstmt: &amp;mut rusqlite::Statement&lt;'_>| {
                            pstmt.query(rusqlite::params_from_iter(args.iter()))
                          },
                        }
                        .try_build()?,
                      )
                    }
                  }
                  

                  Unfortunately I get this:

                  error[E0597]: `pstmt` does not live long enough
                     --> server-lib/src/search.rs:880:1
                      |
                  880 | #[self_referencing]
                      | ^^^^^^^^^^^^^^^^^^-
                      | |                 |
                      | |                 `pstmt` dropped here while still borrowed
                      | |                 borrow might be used here, when `pstmt` is dropped and runs the `Drop` code for type `Statement`
                      | borrowed value does not live long enough
                      |
                      = note: this error originates in the attribute macro `self_referencing` (in Nightly builds, run with -Z macro-backtrace for more info)
                  

                  So close! But no cigar so far. No idea why its complaining.