both
?multiprocessing
makes sharing state hardsoftware for concurrent world
BEAM
1 defmodule Downloader.Mixfile do
2 use Mix.Project
3
4 def project do
5 [app: :downloader, version: "0.0.1", deps: deps,
6 elixir: ">= 1.2.0",
7 build_embedded: Mix.env == :prod,
8 start_permanent: Mix.env == :prod]
9 end
10
11 def application do
12 [applications: [:logger, :httpoison, :poolboy],
13 mod: {Downloader, []}]
14 end
15
16 defp deps do
17 [{:erlport, git: "https://github.com/hdima/erlport.git"},
18 {:poolboy, "~> 1.5"},
19 {:httpoison, "~> 0.8.0"},
20 {:poison, "> 0.8.0"}]
21 end
22 end
1 def start(_type, []) do
2 downloader_pool = [
3 name: {:local, :downloaders},
4 worker_module: Downloader.Worker,
5 size: 10, max_overflow: 1
6 ]
7 processor_pool = [
8 name: {:local, :processors},
9 worker_module: Downloader.Processor,
10 size: 4, max_overflow: 1
11 ]
12 state = nil
13 children = [
14 :poolboy.child_spec(:processors, processor_pool, state),
15 :poolboy.child_spec(:downloaders, downloader_pool, state),
16 worker(Agent, [fn () -> [] end, [name: Dn]]),
17 worker(Downloader.Server, [])
18 ]
19 Supervisor.start_link(children, strategy: :one_for_one,
20 name: Downloader.Supervisor)
21 end
22
1 def download_urls(urls), do: download_urls(__MODULE__, urls)
2 def download_urls(pid, urls) do
3 GenServer.cast(pid, {:download, urls})
4 end
5
6 def handle_cast({:download, urls}, state) do
7 spawn_link(download_fn(urls))
8 {:noreply, state ++ urls}
9 end
10
11 defp download_fn(urls) do
12 fn () ->
13 for u <- urls do
14 spawn_link(fn () ->
15 response = Pool.transaction :downloaders,
16 do: Worker.download(worker, u)
17 result = Pool.transaction :processors,
18 do: Processor.process(worker, response)
19 GenServer.call(__MODULE__, {:done, u})
20 end)
21 end
22 end
23 end
1 defmodule Downloader.Processor do
2 use GenServer
3 alias Application, as: App
4
5 def start_link(_args) do
6 priv_path = App.app_dir(:downloader, "priv") |> to_char_list
7 GenServer.start_link(__MODULE__, priv_path)
8 end
9
10 def process(processor_pid, http_resp) do
11 GenServer.call(processor_pid, convert_resp(http_resp))
12 end
13
14 def init(python_path) do
15 :python.start_link(python_path: python_path)
16 end
17
18 def handle_call(http_resp, _from, python) do
19 :python.call(python, :worker, :process_response, [http_resp])
20 {:reply, :ok, python}
21 end
22 defp convert_resp(http_resp), do: # ...omitted...
23 end
1 from erlport import erlang
2
3
4 def send_new_urls(urls):
5 mod = "Elixir.Downloader.Server"
6 erlang.call(mod, "download_urls", [urls])
7
8
9 def process_response(response):
10 # response looks like this:
11 # [("headers", [...]), ("body", [...]), ...]
12 response = dict(response)
13 more_urls = _do_some_real_processing(response["body"])
14 if more_urls:
15 send_new_urls(more_urls)
16
17
18 def _do_some_real_processing(response_html): # ...omitted...
(all the code on GitHub)